From ac5d258dcc598a2ef13f0609debefae38530d14a Mon Sep 17 00:00:00 2001 From: Juan Sandoval Date: Tue, 16 Feb 2021 20:01:40 -0800 Subject: [PATCH] Beam transform that uses DebeziumIO connector to support CDC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Debeziumio PoC (#7) * New DebeziumIO class. * Merge connector code * DebeziumIO and MySqlConnector integrated. * Added FormatFuntion param to Read builder on DebeziumIO. * Added arguments checker to DebeziumIO. * Add simple JSON mapper object (#1) * Add simple JSON mapper object * Fixed Mapper. * Add SqlServer connector test * Added PostgreSql Connector Test PostgreSql now works with Json mapper * Added PostgreSql Connector Test PostgreSql now works with Json mapper * Fixing MySQL schema DataException Using file instead of schema should fix it * MySQL Connector updated from 1.3.0 to 1.3.1 Co-authored-by: osvaldo-salinas Co-authored-by: Carlos Dominguez Co-authored-by: Carlos Domínguez * Add debeziumio tests * Debeziumio testing json mapper (#3) * Some code refactors. Use a default DBHistory if not provided * Add basic tests for Json mapper * Debeziumio time restriction (#5) * Add simple JSON mapper object * Fixed Mapper. * Add SqlServer connector test * Added PostgreSql Connector Test PostgreSql now works with Json mapper * Added PostgreSql Connector Test PostgreSql now works with Json mapper * Fixing MySQL schema DataException Using file instead of schema should fix it * MySQL Connector updated from 1.3.0 to 1.3.1 * Some code refactors. Use a default DBHistory if not provided * Adding based-time restriction Stop polling after specified amount of time * Add basic tests for Json mapper * Adding new restriction Uses a time-based restriction * Adding optional restrcition Uses an optional time-based restriction Co-authored-by: juanitodread Co-authored-by: osvaldo-salinas * Upgrade DebeziumIO connector (#4) * Address comments (Change dependencies to testCompile, Set JsonMapper/Coder as default, refactors) (#8) * Revert file * Change dependencies to testCompile * Move Counter sample to unit test * Set JsonMapper as default mapper function * Set String Coder as default coder when using JsonMapper * Change logs from info to debug * Debeziumio javadoc (#9) * Adding javadoc * Added some titles and examples * Added SourceRecordJson doc * Added Basic Connector doc * Added KafkaSourceConsumer doc * Javadoc cleanup * Removing BasicConnector No usages of this class were found overall * Editing documentation * Debeziumio fetched records restriction (#10) * Adding javadoc * Adding restriction by number of fetched records Also adding a quick-fix for null value within SourceRecords Minor fix on both MySQL and PostgreSQL Connectors Tests * Run either by time or by number of records * Added DebeziumOffsetTrackerTest Tests both restrictions: By amount of time and by Number of records * Removing comment * DebeziumIO test for DB2. (#11) * DebeziumIO test for DB2. * DebeziumIO javadoc. * Clean code:removed commented code lines on DebeziumIOConnectorTest.java * Clean code:removing unused imports and using readAsJson(). Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com> * Debezium limit records (now configurable) (#12) * Adding javadoc * Records Limit is now configurable (It was fixed before) * Debeziumio dockerize (#13) * Add mysql docker container to tests * Move debezium mysql integration test to its own file * Add assertion to verify that the results contains a record. * Debeziumio readme (#15) * Adding javadoc * Adding README file * Add number of records configuration to the DebeziumIO component (#16) * Code refactors (#17) * Remove/ignore null warnings * Remove DB2 code * Remove docker dependency in DebeziumIO unit test and max number of recods to MySql integration test * Change access modifiers accordingly * Remove incomplete integration tests (Postgres and SqlServer) * Add experimenal tag * Debezium testing stoppable consumer (#18) * Add try-catch-finally, stop SourceTask at finally. * Fix warnings * stopConsumer and processedRecords local variables removed. UT for task stop use case added * Fix minor code style issue Co-authored-by: juanitodread * Fix style issues (check, spotlessApply) (#19) Co-authored-by: Osvaldo Salinas Co-authored-by: alejandro.maguey Co-authored-by: osvaldo-salinas Co-authored-by: Carlos Dominguez Co-authored-by: Carlos Domínguez Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com> Co-authored-by: Alejandro Maguey Co-authored-by: Hassan Reyes Add missing apache license to README.md Enabling integration test for DebeziumIO (#20) Rename connector package cdc=>debezium. Update doc references (#21) Fix code style on DebeziumIOMySqlConnectorIT --- build.gradle.kts | 1 + sdks/java/io/debezium/build.gradle | 83 +++ sdks/java/io/debezium/src/README.md | 178 ++++++ .../apache/beam/io/debezium/DebeziumIO.java | 515 ++++++++++++++++++ .../io/debezium/KafkaSourceConsumerFn.java | 394 ++++++++++++++ .../beam/io/debezium/SourceRecordJson.java | 287 ++++++++++ .../beam/io/debezium/SourceRecordMapper.java | 31 ++ .../apache/beam/io/debezium/package-info.java | 28 + .../debezium/DebeziumIOMySqlConnectorIT.java | 108 ++++ .../beam/io/debezium/DebeziumIOTest.java | 101 ++++ .../debezium/KafkaSourceConsumerFnTest.java | 264 +++++++++ .../beam/io/debezium/OffsetTrackerTest.java | 71 +++ .../io/debezium/SourceRecordJsonTest.java | 113 ++++ settings.gradle.kts | 1 + 14 files changed, 2175 insertions(+) create mode 100644 sdks/java/io/debezium/build.gradle create mode 100644 sdks/java/io/debezium/src/README.md create mode 100644 sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java create mode 100644 sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java create mode 100644 sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java create mode 100644 sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java create mode 100644 sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java create mode 100644 sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java create mode 100644 sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java create mode 100644 sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java create mode 100644 sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java create mode 100644 sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java diff --git a/build.gradle.kts b/build.gradle.kts index c2af480ea39a1..5b51c77ba579f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -163,6 +163,7 @@ task("javaPostCommit") { dependsOn(":runners:google-cloud-dataflow-java:postCommitRunnerV2") dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit") dependsOn(":sdks:java:extensions:zetasketch:postCommit") + dependsOn(":sdks:java:io:debezium:integrationTest") dependsOn(":sdks:java:io:google-cloud-platform:postCommit") dependsOn(":sdks:java:io:kinesis:integrationTest") dependsOn(":sdks:java:extensions:ml:postCommit") diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle new file mode 100644 index 0000000000000..3c43f98773580 --- /dev/null +++ b/sdks/java/io/debezium/build.gradle @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import groovy.json.JsonOutput + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.debezium', + mavenRepositories: [ + [id: 'io.confluent', url: 'https://packages.confluent.io/maven/'] + ], + checkerTooSlowOnTests: true, + enableSpotbugs: false, +) +provideIntegrationTestingDependencies() + +description = "Apache Beam :: SDKs :: Java :: IO :: Debezium" +ext.summary = "Library to work with Debezium data." + +dependencies { + compile library.java.vendored_guava_26_0_jre + compile project(path: ":sdks:java:core", configuration: "shadow") + compile library.java.slf4j_api + compile library.java.joda_time + provided library.java.jackson_dataformat_csv + testCompile project(path: ":sdks:java:core", configuration: "shadowTest") + testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime") + + // Test dependencies + testCompile library.java.junit + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testCompile project(":runners:google-cloud-dataflow-java") + testCompile "org.testcontainers:testcontainers:1.15.1" + testCompile "org.testcontainers:mysql:1.15.1" + + // Kafka connect dependencies + compile "org.apache.kafka:connect-api:2.5.0" + compile "org.apache.kafka:connect-json:2.5.0" + + // Debezium dependencies + compile group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final' + testCompile group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final' +} + +test { + testLogging { + outputs.upToDateWhen {false} + showStandardStreams = true + } +} + + +task integrationTest(type: Test, dependsOn: processTestResources) { + group = "Verification" + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=DirectRunner", + ]) + + // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" + outputs.upToDateWhen { false } + + include '**/*IT.class' + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs + + useJUnit { + } +} diff --git a/sdks/java/io/debezium/src/README.md b/sdks/java/io/debezium/src/README.md new file mode 100644 index 0000000000000..4cf9be81c618e --- /dev/null +++ b/sdks/java/io/debezium/src/README.md @@ -0,0 +1,178 @@ + + +# DebeziumIO +## Connect your Debezium Databases to Apache Beam easily. + +### What is DebeziumIO? +DebeziumIO is an Apache Beam connector that lets users connect their Events-Driven Databases on [Debezium](https://debezium.io) to [Apache Beam](https://beam.apache.org/) without the need to set up a [Kafka](https://kafka.apache.org/) instance. + +### Getting Started + +DebeziumIO uses [Debezium Connectors v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines. + +These connectors have been successfully tested and are known to work fine: +* MySQL Connector +* PostgreSQL Connector +* SQLServer Connector +* DB2 Connector + +Other connectors might also work. + + +Setting up a connector and running a Pipeline should be as simple as: +``` +Pipeline p = Pipeline.create(); // Create a Pipeline + p.apply(DebeziumIO.read() + .withConnectorConfiguration(...) // Debezium Connector setup + .withFormatFunction(...) // Serializable Function to use + ).setCoder(StringUtf8Coder.of()); +p.run().waitUntilFinish(); // Run your pipeline! +``` + +### Setting up a Debezium Connector + +DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you provide all the configuration needed to access your Debezium Database. + +A basic configuration such as **username**, **password**, **port number**, and **host name** must be specified along with the **Debezium Connector class** you will use by using these methods: + +|Method|Param|Description| +|-|-|-| +|`.withConnectorClass(connectorClass)`|_Class_|Debezium Connector| +|`.withUsername(username)`|_String_|Database Username| +|`.withPassword(password)`|_String_|Database Password| +|`.withHostName(hostname)`|_String_|Database Hostname| +|`.withPort(portNumber)`|_String_|Database Port number| + +You can also add more configuration, such as Connector-specific Properties with the `_withConnectionProperty_` method: + +|Method|Params|Description| +|-|-|-| +|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a custom property to the connector.| +> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/1.3/connectors/) specific documentation. + +Example of a MySQL Debezium Connector setup: +``` +DebeziumIO.ConnectorConfiguration.create() + .withUsername("dbUsername") + .withPassword("dbPassword") + .withConnectorClass(MySqlConnector.class) + .withHostName("127.0.0.1") + .withPort("3306") + .withConnectionProperty("database.server.id", "serverId") + .withConnectionProperty("database.server.name", "serverName") + .withConnectionProperty("database.include.list", "dbName") + .withConnectionProperty("include.schema.changes", "false") +``` + +### Setting a Serializable Function + +A serializable function is required to depict each `SourceRecord` fetched from the Database. + +DebeziumIO comes with a built-in JSON Mapper that you can optionally use to map every `SourceRecord` fetched from the Database to a JSON object. This helps users visualize and access their data in a simple way. + +If you want to use this built-in JSON Mapper, you can do it by setting an instance of **SourceRecordJsonMapper** as a Serializable Function to the DebeziumIO: +``` +.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()) +``` +> **Note:** `SourceRecordJsonMapper`comes out of the box, but you may use any Format Function you prefer. + +## Quick Example + +The following example is how an actual setup would look like using a **MySQL Debezium Connector** and **SourceRecordJsonMapper** as the Serializable Function. +``` +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); +p.apply(DebeziumIO.read(). + withConnectorConfiguration( // Debezium Connector setup + DebeziumIO.ConnectorConfiguration.create() + .withUsername("debezium") + .withPassword("dbz") + .withConnectorClass(MySqlConnector.class) + .withHostName("127.0.0.1") + .withPort("3306") + .withConnectionProperty("database.server.id", "184054") + .withConnectionProperty("database.server.name", "dbserver1") + .withConnectionProperty("database.include.list", "inventory") + .withConnectionProperty("include.schema.changes", "false") + ).withFormatFunction( + new SourceRecordJson.SourceRecordJsonMapper() // Serializable Function + ) +).setCoder(StringUtf8Coder.of()); + +p.run().waitUntilFinish(); +``` + +## Shortcut! + +If you will be using the built-in **SourceRecordJsonMapper** as your Serializable Function for all your pipelines, you should use **readAsJson()**. + +DebeziumIO comes with a method called `readAsJson`, which automatically sets the `SourceRecordJsonMapper` as the Serializable Function for your pipeline. This way, you would need to setup your connector before running your pipeline, without explicitly setting a Serializable Function. + +Example of using **readAsJson**: +``` +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); +p.apply(DebeziumIO.read(). + withConnectorConfiguration( // Debezium Connector setup + DebeziumIO.ConnectorConfiguration.create() + .withUsername("debezium") + .withPassword("dbz") + .withConnectorClass(MySqlConnector.class) + .withHostName("127.0.0.1") + .withPort("3306") + .withConnectionProperty("database.server.id", "184054") + .withConnectionProperty("database.server.name", "dbserver1") + .withConnectionProperty("database.include.list", "inventory") + .withConnectionProperty("include.schema.changes", "false")); + +p.run().waitUntilFinish(); +``` + +## Under the hood + +### KafkaSourceConsumerFn and Restrictions + +KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC. + +There are two ways of initializing KSC: +* Restricted by number of records +* Restricted by amount of time (minutes) + +By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter: + +|Function|Param|Description| +|-|-|-| +|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, SourceRecordMapper, Int_|Restrict run by number of records (Default).| +|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).| + +### Requirements and Supported versions + +- JDK v8 +- Debezium Connectors v1.3 +- Apache Beam 2.25 + +## Running Unit Tests + +You can run Integration Tests using **gradlew**. + +Example of running the MySQL Connector Integration Test: +``` +./gradlew integrationTest -p sdks/java/io/debezium/ --tests org.apache.beam.io.debezium.DebeziumIOMySqlConnectorIT -DintegrationTestRunner=direct +``` \ No newline at end of file diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java new file mode 100644 index 0000000000000..b7c084fcba78d --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.kafka.connect.source.SourceConnector; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class which exposes an implementation {@link #read} and a Debezium configuration. + * + *

Quick Overview

+ * + *

This class lets Beam users connect to their existing Debezium implementations in an easy way. + * + *

Any Kafka connector supported by Debezium should work fine with this IO. + * + *

The following connectors were tested and worked well in some simple scenarios: + * + *

    + *
  • MySQL + *
  • PostgreSQL + *
  • SQLServer + *
  • DB2 + *
+ * + *

Usage example

+ * + *

Connect to a Debezium - MySQL database and run a Pipeline + * + *

+ *     private static final ConnectorConfiguration mySqlConnectorConfig = ConnectorConfiguration
+ *             .create()
+ *             .withUsername("uname")
+ *             .withPassword("pwd123")
+ *             .withHostName("127.0.0.1")
+ *             .withPort("3306")
+ *             .withConnectorClass(MySqlConnector.class)
+ *             .withConnectionProperty("database.server.id", "184054")
+ *             .withConnectionProperty("database.server.name", "serverid")
+ *             .withConnectionProperty("database.include.list", "dbname")
+ *             .withConnectionProperty("database.history", DebeziumSDFDatabaseHistory.class.getName())
+ *             .withConnectionProperty("include.schema.changes", "false");
+ *
+ *      PipelineOptions options = PipelineOptionsFactory.create();
+ *      Pipeline p = Pipeline.create(options);
+ *      p.apply(DebeziumIO.read()
+ *               .withConnectorConfiguration(mySqlConnectorConfig)
+ *               .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
+ *       ).setCoder(StringUtf8Coder.of());
+ *       p.run().waitUntilFinish();
+ * 
+ * + *

In this example we are using {@link KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory} to + * handle the Database history. + * + *

Dependencies

+ * + *

User may work with any of the supported Debezium Connectors above mentioned + * + *

See Debezium + * Connectors for more info. + */ +@Experimental(Kind.SOURCE_SINK) +@SuppressWarnings({"nullness"}) +public class DebeziumIO { + private static final Logger LOG = LoggerFactory.getLogger(DebeziumIO.class); + + /** + * Read data from a Debezium source. + * + * @param Type of the data to be read. + */ + public static Read read() { + return new AutoValue_DebeziumIO_Read.Builder().build(); + } + + /** + * Read data from Debezium source and convert a Kafka {@link + * org.apache.kafka.connect.source.SourceRecord} into a JSON string using {@link + * org.apache.beam.io.debezium.SourceRecordJson.SourceRecordJsonMapper} as default function + * mapper. + * + * @return Reader object of String. + */ + public static Read readAsJson() { + return new AutoValue_DebeziumIO_Read.Builder() + .setFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()) + .setCoder(StringUtf8Coder.of()) + .build(); + } + + /** Disallow construction of utility class. */ + private DebeziumIO() {} + + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read extends PTransform> { + + private static final long serialVersionUID = 1L; + + abstract @Nullable ConnectorConfiguration getConnectorConfiguration(); + + abstract @Nullable SourceRecordMapper getFormatFunction(); + + abstract @Nullable Integer getMaxNumberOfRecords(); + + abstract @Nullable Coder getCoder(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectorConfiguration(ConnectorConfiguration config); + + abstract Builder setCoder(Coder coder); + + abstract Builder setFormatFunction(SourceRecordMapper mapperFn); + + abstract Builder setMaxNumberOfRecords(Integer maxNumberOfRecords); + + abstract Read build(); + } + + /** + * Applies the given configuration to the connector. It cannot be null. + * + * @param config Configuration to be used within the connector. + * @return PTransform {@link #read} + */ + public Read withConnectorConfiguration(final ConnectorConfiguration config) { + checkArgument(config != null, "config can not be null"); + return toBuilder().setConnectorConfiguration(config).build(); + } + + /** + * Applies a {@link SourceRecordMapper} to the connector. It cannot be null. + * + * @param mapperFn the mapper function to be used on each {@link + * org.apache.kafka.connect.source.SourceRecord}. + * @return PTransform {@link #read} + */ + public Read withFormatFunction(SourceRecordMapper mapperFn) { + checkArgument(mapperFn != null, "mapperFn can not be null"); + return toBuilder().setFormatFunction(mapperFn).build(); + } + + /** + * Applies a {@link Coder} to the connector. It cannot be null + * + * @param coder The Coder to be used over the data. + * @return PTransform {@link #read} + */ + public Read withCoder(Coder coder) { + checkArgument(coder != null, "coder can not be null"); + return toBuilder().setCoder(coder).build(); + } + + /** + * Once the specified number of records has been reached, it will stop fetching them. The value + * can be null (default) which means it will not stop. + * + * @param maxNumberOfRecords The maximum number of records to be fetched before stop. + * @return PTransform {@link #read} + */ + public Read withMaxNumberOfRecords(Integer maxNumberOfRecords) { + return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build(); + } + + @Override + public PCollection expand(PBegin input) { + return input + .apply( + Create.of(Lists.newArrayList(getConnectorConfiguration().getConfigurationMap())) + .withCoder(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply( + ParDo.of( + new KafkaSourceConsumerFn<>( + getConnectorConfiguration().getConnectorClass().get(), + getFormatFunction(), + getMaxNumberOfRecords()))) + .setCoder(getCoder()); + } + } + + /** A POJO describing a Debezium configuration. */ + @AutoValue + public abstract static class ConnectorConfiguration implements Serializable { + private static final long serialVersionUID = 1L; + + abstract @Nullable ValueProvider> getConnectorClass(); + + abstract @Nullable ValueProvider getHostName(); + + abstract @Nullable ValueProvider getPort(); + + abstract @Nullable ValueProvider getUsername(); + + abstract @Nullable ValueProvider getPassword(); + + abstract @Nullable ValueProvider getSourceConnector(); + + abstract @Nullable ValueProvider> getConnectionProperties(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectorClass(ValueProvider> connectorClass); + + abstract Builder setHostName(ValueProvider hostname); + + abstract Builder setPort(ValueProvider port); + + abstract Builder setUsername(ValueProvider username); + + abstract Builder setPassword(ValueProvider password); + + abstract Builder setConnectionProperties( + ValueProvider> connectionProperties); + + abstract Builder setSourceConnector(ValueProvider sourceConnector); + + abstract ConnectorConfiguration build(); + } + + /** + * Creates a ConnectorConfiguration. + * + * @return {@link ConnectorConfiguration} + */ + public static ConnectorConfiguration create() { + return new AutoValue_DebeziumIO_ConnectorConfiguration.Builder() + .setConnectionProperties(ValueProvider.StaticValueProvider.of(new HashMap<>())) + .build(); + } + + /** + * Applies the connectorClass to be used to connect to your database. + * + *

Currently supported connectors are: + * + *

    + *
  • {@link io.debezium.connector.mysql.MySqlConnector} + *
  • {@link io.debezium.connector.postgresql.PostgresConnector} + *
  • {@link io.debezium.connector.sqlserver.SqlServerConnector } + *
+ * + * @param connectorClass Any of the supported connectors. + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withConnectorClass(Class connectorClass) { + checkArgument(connectorClass != null, "connectorClass can not be null"); + return withConnectorClass(ValueProvider.StaticValueProvider.of(connectorClass)); + } + + /** + * Sets the connectorClass to be used to connect to your database. It cannot be null. + * + *

Currently supported connectors are: + * + *

    + *
  • {@link io.debezium.connector.mysql.MySqlConnector} + *
  • {@link io.debezium.connector.postgresql.PostgresConnector} + *
  • {@link io.debezium.connector.sqlserver.SqlServerConnector } + *
+ * + * @param connectorClass (as ValueProvider) + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withConnectorClass(ValueProvider> connectorClass) { + checkArgument(connectorClass != null, "connectorClass can not be null"); + return builder().setConnectorClass(connectorClass).build(); + } + + /** + * Sets the host name to be used on the database. It cannot be null. + * + * @param hostName The hostname of your database. + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withHostName(String hostName) { + checkArgument(hostName != null, "hostName can not be null"); + return withHostName(ValueProvider.StaticValueProvider.of(hostName)); + } + + /** + * Sets the host name to be used on the database. It cannot be null. + * + * @param hostName The hostname of your database (as ValueProvider). + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withHostName(ValueProvider hostName) { + checkArgument(hostName != null, "hostName can not be null"); + return builder().setHostName(hostName).build(); + } + + /** + * Sets the port on which your database is listening. It cannot be null. + * + * @param port The port to be used to connect to your database (as ValueProvider). + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withPort(String port) { + checkArgument(port != null, "port can not be null"); + return withPort(ValueProvider.StaticValueProvider.of(port)); + } + + /** + * Sets the port on which your database is listening. It cannot be null. + * + * @param port The port to be used to connect to your database. + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withPort(ValueProvider port) { + checkArgument(port != null, "port can not be null"); + return builder().setPort(port).build(); + } + + /** + * Sets the username to connect to your database. It cannot be null. + * + * @param username Database username + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withUsername(String username) { + checkArgument(username != null, "username can not be null"); + return withUsername(ValueProvider.StaticValueProvider.of(username)); + } + + /** + * Sets the username to connect to your database. It cannot be null. + * + * @param username (as ValueProvider). + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withUsername(ValueProvider username) { + checkArgument(username != null, "username can not be null"); + return builder().setUsername(username).build(); + } + + /** + * Sets the password to connect to your database. It cannot be null. + * + * @param password Database password + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withPassword(String password) { + checkArgument(password != null, "password can not be null"); + return withPassword(ValueProvider.StaticValueProvider.of(password)); + } + + /** + * Sets the password to connect to your database. It cannot be null. + * + * @param password (as ValueProvider). + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withPassword(ValueProvider password) { + checkArgument(password != null, "password can not be null"); + return builder().setPassword(password).build(); + } + + /** + * Sets a custom property to be used within the connection to your database. + * + *

You may use this to set special configurations such as: + * + *

    + *
  • slot.name + *
  • database.dbname + *
  • database.server.id + *
  • ... + *
+ * + * @param connectionProperties Properties (Key, Value) Map + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withConnectionProperties( + Map connectionProperties) { + checkArgument(connectionProperties != null, "connectionProperties can not be null"); + return withConnectionProperties(ValueProvider.StaticValueProvider.of(connectionProperties)); + } + + /** + * Sets a custom property to be used within the connection to your database. + * + *

You may use this to set special configurations such as: + * + *

    + *
  • slot.name + *
  • database.dbname + *
  • database.server.id + *
  • ... + *
+ * + * @param connectionProperties (as ValueProvider). + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withConnectionProperties( + ValueProvider> connectionProperties) { + checkArgument(connectionProperties != null, "connectionProperties can not be null"); + return builder().setConnectionProperties(connectionProperties).build(); + } + + /** + * Sets a custom property to be used within the connection to your database. + * + *

You may use this to set special configurations such as: + * + *

    + *
  • slot.name + *
  • database.dbname + *
  • database.server.id + *
  • ... + *
+ * + * @param key Property name + * @param value Property value + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withConnectionProperty(String key, String value) { + checkArgument(key != null, "key can not be null"); + checkArgument(value != null, "value can not be null"); + checkArgument( + getConnectionProperties().get() != null, "connectionProperties can not be null"); + + ConnectorConfiguration config = builder().build(); + config.getConnectionProperties().get().putIfAbsent(key, value); + return config; + } + + /** + * Sets the {@link SourceConnector} to be used. It cannot be null. + * + * @param sourceConnector Any supported connector + * @return {@link ConnectorConfiguration} + */ + public ConnectorConfiguration withSourceConnector(SourceConnector sourceConnector) { + checkArgument(sourceConnector != null, "sourceConnector can not be null"); + return withSourceConnector(ValueProvider.StaticValueProvider.of(sourceConnector)); + } + + public ConnectorConfiguration withSourceConnector( + ValueProvider sourceConnector) { + checkArgument(sourceConnector != null, "sourceConnector can not be null"); + return builder().setSourceConnector(sourceConnector).build(); + } + + /** + * Configuration Map Getter. + * + * @return Configuration Map. + */ + public Map getConfigurationMap() { + HashMap configuration = new HashMap<>(); + + configuration.computeIfAbsent( + "connector.class", k -> getConnectorClass().get().getCanonicalName()); + configuration.computeIfAbsent("database.hostname", k -> getHostName().get()); + configuration.computeIfAbsent("database.port", k -> getPort().get()); + configuration.computeIfAbsent("database.user", k -> getUsername().get()); + configuration.computeIfAbsent("database.password", k -> getPassword().get()); + + for (Map.Entry entry : getConnectionProperties().get().entrySet()) { + configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue()); + } + + // Set default Database History impl. if not provided + configuration.computeIfAbsent( + "database.history", + k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()); + + String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration); + LOG.debug("---------------- Connector configuration: {}", stringProperties); + + return configuration; + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java new file mode 100644 index 0000000000000..c5a5b4f25b727 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import io.debezium.document.Document; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.AbstractDatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.HistoryRecord; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + *

Quick Overview

+ * + * SDF used to process records fetched from supported Debezium Connectors. + * + *

Currently it has a time limiter (see {@link OffsetTracker}) which, if set, it will stop + * automatically after the specified elapsed minutes. Otherwise, it will keep running until the user + * explicitly interrupts it. + * + *

It might be initialized either as: + * + *

KafkaSourceConsumerFn(connectorClass, SourceRecordMapper)
+ * + * Or with a time limiter: + * + *
KafkaSourceConsumerFn(connectorClass, SourceRecordMapper, minutesToRun)
+ */ +@SuppressWarnings({"nullness"}) +public class KafkaSourceConsumerFn extends DoFn, T> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceConsumerFn.class); + public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance"; + + private final Class connectorClass; + private final SourceRecordMapper fn; + + private static long minutesToRun = -1; + private static Integer maxRecords; + private static DateTime startTime; + private static final Map>> + restrictionTrackers = new ConcurrentHashMap<>(); + + /** + * Initializes the SDF with a time limit. + * + * @param connectorClass Supported Debezium connector class + * @param fn a SourceRecordMapper + * @param minutesToRun Maximum time to run (in minutes) + */ + public KafkaSourceConsumerFn( + Class connectorClass, SourceRecordMapper fn, long minutesToRun) { + this.connectorClass = (Class) connectorClass; + this.fn = fn; + KafkaSourceConsumerFn.minutesToRun = minutesToRun; + } + + /** + * Initializes the SDF to be run indefinitely. + * + * @param connectorClass Supported Debezium connector class + * @param fn a SourceRecordMapper + */ + public KafkaSourceConsumerFn( + Class connectorClass, SourceRecordMapper fn, Integer maxRecords) { + this.connectorClass = (Class) connectorClass; + this.fn = fn; + KafkaSourceConsumerFn.maxRecords = maxRecords; + } + + @GetInitialRestriction + public OffsetHolder getInitialRestriction(@Element Map unused) + throws IOException { + KafkaSourceConsumerFn.startTime = new DateTime(); + return new OffsetHolder(null, null, null); + } + + @NewTracker + public RestrictionTracker> newTracker( + @Restriction OffsetHolder restriction) { + return new OffsetTracker(restriction); + } + + @GetRestrictionCoder + public Coder getRestrictionCoder() { + return SerializableCoder.of(OffsetHolder.class); + } + + /** + * Process the retrieved element. Currently it just logs the retrieved record as JSON. + * + * @param element Record retrieved + * @param tracker Restriction Tracker + * @param receiver Output Receiver + * @return + * @throws Exception + */ + @DoFn.ProcessElement + public ProcessContinuation process( + @Element Map element, + RestrictionTracker> tracker, + OutputReceiver receiver) + throws Exception { + Map configuration = new HashMap<>(element); + + // Adding the current restriction to the class object to be found by the database history + restrictionTrackers.put(this.getHashCode(), tracker); + configuration.put(BEAM_INSTANCE_PROPERTY, this.getHashCode()); + + SourceConnector connector = connectorClass.getDeclaredConstructor().newInstance(); + connector.start(configuration); + + SourceTask task = (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance(); + + try { + Map consumerOffset = tracker.currentRestriction().offset; + LOG.debug("--------- Consumer offset from Debezium Tracker: {}", consumerOffset); + + task.initialize(new BeamSourceTaskContext(tracker.currentRestriction().offset)); + task.start(connector.taskConfigs(1).get(0)); + + List records = task.poll(); + + if (records == null) { + LOG.debug("-------- Pulled records null"); + return ProcessContinuation.stop(); + } + + LOG.debug("-------- {} records found", records.size()); + if (!records.isEmpty()) { + for (SourceRecord record : records) { + LOG.debug("-------- Record found: {}", record); + + Map offset = (Map) record.sourceOffset(); + + if (offset == null || !tracker.tryClaim(offset)) { + LOG.debug("-------- Offset null or could not be claimed"); + return ProcessContinuation.stop(); + } + + T json = this.fn.mapSourceRecord(record); + LOG.debug("****************** RECEIVED SOURCE AS JSON: {}", json); + + receiver.output(json); + } + + task.commit(); + } + } catch (Exception ex) { + LOG.error( + "-------- Error on consumer: {}. with stacktrace: {}", + ex.getMessage(), + ex.getStackTrace()); + } finally { + restrictionTrackers.remove(this.getHashCode()); + + LOG.debug("------- Stopping SourceTask"); + task.stop(); + } + + return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); + } + + public String getHashCode() { + return Integer.toString(System.identityHashCode(this)); + } + + private static class BeamSourceTaskContext implements SourceTaskContext { + private final @Nullable Map initialOffset; + + BeamSourceTaskContext(@Nullable Map initialOffset) { + this.initialOffset = initialOffset; + } + + @Override + public Map configs() { + // TODO(pabloem): Do we need to implement this? + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public OffsetStorageReader offsetStorageReader() { + LOG.debug("------------- Creating an offset storage reader"); + return new DebeziumSourceOffsetStorageReader(initialOffset); + } + } + + private static class DebeziumSourceOffsetStorageReader implements OffsetStorageReader { + private final Map offset; + + DebeziumSourceOffsetStorageReader(Map initialOffset) { + this.offset = initialOffset; + } + + @Override + public Map offset(Map partition) { + return offsets(Collections.singletonList(partition)) + .getOrDefault(partition, ImmutableMap.of()); + } + + @Override + public Map, Map> offsets( + Collection> partitions) { + LOG.debug("-------------- GETTING OFFSETS!"); + + Map, Map> map = new HashMap<>(); + for (Map partition : partitions) { + map.put(partition, (Map) offset); + } + + LOG.debug("-------------- OFFSETS: {}", map); + return map; + } + } + + static class OffsetHolder implements Serializable { + public final @Nullable Map offset; + public final @Nullable List history; + public final @Nullable Integer fetchedRecords; + + OffsetHolder( + @Nullable Map offset, + @Nullable List history, + @Nullable Integer fetchedRecords) { + this.offset = offset; + this.history = history == null ? new ArrayList<>() : history; + this.fetchedRecords = fetchedRecords; + } + } + + /** {@link RestrictionTracker} for Debezium connectors. */ + static class OffsetTracker extends RestrictionTracker> { + private OffsetHolder restriction; + private static final long MILLIS = 60 * 1000; + + OffsetTracker(OffsetHolder holder) { + this.restriction = holder; + } + + /** + * Overriding {@link #tryClaim} in order to stop fetching records from the database. + * + *

This works on two different ways: + * + *

Number of records

+ * + *

This is the default behavior. Once the specified number of records has been reached, it + * will stop fetching them. + * + *

Time based

+ * + * User may specify the amount of time the connector to be kept alive. Please see {@link + * KafkaSourceConsumerFn} for more details on this. + * + * @param position Currently not used + * @return boolean + */ + @Override + public boolean tryClaim(Map position) { + LOG.debug("-------------- Claiming {} used to have: {}", position, restriction.offset); + long elapsedTime = System.currentTimeMillis() - startTime.getMillis(); + int fetchedRecords = + this.restriction.fetchedRecords == null ? 0 : this.restriction.fetchedRecords + 1; + LOG.debug("-------------- Time running: {} / {}", elapsedTime, (minutesToRun * MILLIS)); + this.restriction = new OffsetHolder(position, this.restriction.history, fetchedRecords); + LOG.debug("-------------- History: {}", this.restriction.history); + + if (maxRecords == null && minutesToRun == -1) { + return true; + } + + if (maxRecords != null) { + return fetchedRecords < maxRecords; + } + + return elapsedTime < minutesToRun * MILLIS; + } + + @Override + public OffsetHolder currentRestriction() { + return restriction; + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { + LOG.debug("-------------- Trying to split: fractionOfRemainder={}", fractionOfRemainder); + return SplitResult.of(new OffsetHolder(null, null, null), restriction); + } + + @Override + public void checkDone() throws IllegalStateException {} + + @Override + public IsBounded isBounded() { + return IsBounded.BOUNDED; + } + } + + public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory { + private List history; + + public DebeziumSDFDatabaseHistory() { + this.history = new ArrayList<>(); + } + + @Override + public void start() { + super.start(); + LOG.debug( + "------------ STARTING THE DATABASE HISTORY! - trackers: {} - config: {}", + restrictionTrackers, + config.asMap()); + + // We fetch the first key to get the first restriction tracker. + // TODO(BEAM-11737): This may cause issues with multiple trackers in the future. + RestrictionTracker tracker = + restrictionTrackers.get(restrictionTrackers.keySet().iterator().next()); + this.history = (List) tracker.currentRestriction().history; + } + + @Override + protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + LOG.debug("------------- Adding history! {}", record); + + history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document())); + } + + @Override + protected void recoverRecords(Consumer consumer) { + LOG.debug("------------- Trying to recover!"); + + try { + for (byte[] record : history) { + Document doc = DocumentReader.defaultReader().read(record); + consumer.accept(new HistoryRecord(doc)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean exists() { + return history != null && !history.isEmpty(); + } + + @Override + public boolean storageExists() { + return history != null && !history.isEmpty(); + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java new file mode 100644 index 0000000000000..10bcb143ed4f2 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.GsonBuilder; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.source.SourceRecord; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * This class can be used as a mapper for each {@link SourceRecord} retrieved. + * + *

What it does

+ * + *

It maps any SourceRecord retrieved from any supported {@link io.debezium.connector} to JSON + * + *

How it works

+ * + *

It will extract valuable fields from any given SourceRecord: + * + *

    + *
  • before - {@link #loadBefore} + *
  • after - {@link #loadAfter} + *
  • metadata - {@link #loadMetadata} + *
      + *
    • schema - Database Schema + *
    • connector - Connector used + *
    • version - Connector version + *
    + *
+ * + *

Usage Example

+ * + *

Map each SourceRecord to JSON + * + *

+ *     DebeziumIO.read()
+ *         .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()
+ * 
+ */ +@SuppressWarnings({"nullness"}) +public class SourceRecordJson { + private final @Nullable SourceRecord sourceRecord; + private final @Nullable Struct value; + private final @Nullable Event event; + + /** + * Initializer. + * + * @param sourceRecord retrieved SourceRecord using a supported SourceConnector + */ + public SourceRecordJson(@Nullable SourceRecord sourceRecord) { + if (sourceRecord == null) { + throw new IllegalArgumentException(); + } + + this.sourceRecord = sourceRecord; + this.value = (Struct) sourceRecord.value(); + + if (this.value == null) { + this.event = new Event(null, null, null); + } else { + Event.Metadata metadata = this.loadMetadata(); + Event.Before before = this.loadBefore(); + Event.After after = this.loadAfter(); + + this.event = new Event(metadata, before, after); + } + } + + /** + * Extracts metadata from the SourceRecord. + * + * @return Metadata + */ + private Event.Metadata loadMetadata() { + @Nullable Struct source; + try { + source = (Struct) this.value.get("source"); + } catch (RuntimeException e) { + throw new IllegalArgumentException(); + } + @Nullable String schema; + + if (source == null) { + return null; + } + + try { + // PostgreSQL and SQL server use Schema + schema = source.getString("schema"); + } catch (DataException e) { + // MySQL uses file instead + schema = source.getString("file"); + } + + return new Event.Metadata( + source.getString("connector"), + source.getString("version"), + source.getString("name"), + source.getString("db"), + schema, + source.getString("table")); + } + + /** + * Extracts the before field within SourceRecord. + * + * @return Before + */ + private Event.Before loadBefore() { + @Nullable Struct before; + try { + before = (Struct) this.value.get("before"); + } catch (DataException e) { + return null; + } + if (before == null) { + return null; + } + + Map fields = new HashMap<>(); + for (Field field : before.schema().fields()) { + fields.put(field.name(), before.get(field)); + } + + return new Event.Before(fields); + } + + /** + * Extracts the after field within SourceRecord. + * + * @return After + */ + private Event.After loadAfter() { + @Nullable Struct after; + try { + after = (Struct) this.value.get("after"); + } catch (DataException e) { + return null; + } + if (after == null) { + return null; + } + + Map fields = new HashMap<>(); + for (Field field : after.schema().fields()) { + fields.put(field.name(), after.get(field)); + } + + return new Event.After(fields); + } + + /** + * Transforms the extracted data to a JSON string. + * + * @return JSON String + */ + public String toJson() { + return this.event.toJson(); + } + + /** {@link SourceRecordJson} implementation. */ + public static class SourceRecordJsonMapper implements SourceRecordMapper { + @Override + public String mapSourceRecord(SourceRecord sourceRecord) throws Exception { + return new SourceRecordJson(sourceRecord).toJson(); + } + } + + /** Depicts a SourceRecord as an Event in order for it to be mapped as JSON. */ + static class Event implements Serializable { + private final SourceRecordJson.Event.Metadata metadata; + private final SourceRecordJson.Event.Before before; + private final SourceRecordJson.Event.After after; + + /** + * Event Initializer. + * + * @param metadata Metadata retrieved from SourceRecord + * @param before Before data retrieved from SourceRecord + * @param after After data retrieved from SourceRecord + */ + public Event( + SourceRecordJson.Event.Metadata metadata, + SourceRecordJson.Event.Before before, + SourceRecordJson.Event.After after) { + this.metadata = metadata; + this.before = before; + this.after = after; + } + + /** + * Transforms the Event to a JSON string. + * + * @return JSON String + */ + public String toJson() { + Gson gson = new GsonBuilder().serializeNulls().create(); + return gson.toJson(this); + } + + /** Depicts the metadata within a SourceRecord. It has valuable fields. */ + static class Metadata implements Serializable { + private final @Nullable String connector; + private final @Nullable String version; + private final @Nullable String name; + private final @Nullable String database; + private final @Nullable String schema; + private final @Nullable String table; + + /** + * Metadata Initializer. + * + * @param connector Connector used + * @param version Connector version + * @param name Connector name + * @param database DB name + * @param schema Schema name + * @param table Table name + */ + public Metadata( + @Nullable String connector, + @Nullable String version, + @Nullable String name, + @Nullable String database, + @Nullable String schema, + @Nullable String table) { + this.connector = connector; + this.version = version; + this.name = name; + this.database = database; + this.schema = schema; + this.table = table; + } + } + + /** Depicts the before field within SourceRecord. */ + static class Before implements Serializable { + private final @Nullable Map fields; + + /** + * Before Initializer. + * + * @param fields Key - Value map with information within Before + */ + public Before(@Nullable Map fields) { + this.fields = fields; + } + } + + /** Depicts the after field within SourceRecord. */ + static class After implements Serializable { + private final @Nullable Map fields; + + /** + * After Initializer. + * + * @param fields Key - Value map with information within After + */ + public After(@Nullable Map fields) { + this.fields = fields; + } + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java new file mode 100644 index 0000000000000..65e42e915f452 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import java.io.Serializable; +import org.apache.kafka.connect.source.SourceRecord; + +/** + * Interface used to map a Kafka source record. + * + * @param The desired type you want to map the Kafka source record + */ +@FunctionalInterface +public interface SourceRecordMapper extends Serializable { + T mapSourceRecord(SourceRecord sourceRecord) throws Exception; +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java new file mode 100644 index 0000000000000..86ba1f593c9e5 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading from DebeziumIO. + * + * @see org.apache.beam.io.debezium.DebeziumIO + */ +@Experimental(Kind.SOURCE_SINK) +package org.apache.beam.io.debezium; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java new file mode 100644 index 0000000000000..6056ca0ebf6c9 --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; + +import io.debezium.connector.mysql.MySqlConnector; +import java.time.Duration; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +@RunWith(JUnit4.class) +public class DebeziumIOMySqlConnectorIT { + /** + * Debezium - MySqlContainer + * + *

Creates a docker container using the image used by the debezium tutorial. + */ + @ClassRule + public static final MySQLContainer MY_SQL_CONTAINER = + new MySQLContainer<>( + DockerImageName.parse("debezium/example-mysql:1.4") + .asCompatibleSubstituteFor("mysql")) + .withPassword("debezium") + .withUsername("mysqluser") + .withExposedPorts(3306) + .waitingFor( + new HttpWaitStrategy() + .forPort(3306) + .forStatusCodeMatching(response -> response == 200) + .withStartupTimeout(Duration.ofMinutes(2))); + + /** + * Debezium - MySQL connector Test. + * + *

Tests that connector can actually connect to the database + */ + @Test + public void testDebeziumIOMySql() { + MY_SQL_CONTAINER.start(); + + String host = MY_SQL_CONTAINER.getContainerIpAddress(); + String port = MY_SQL_CONTAINER.getMappedPort(3306).toString(); + + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline p = Pipeline.create(options); + PCollection results = + p.apply( + DebeziumIO.read() + .withConnectorConfiguration( + DebeziumIO.ConnectorConfiguration.create() + .withUsername("debezium") + .withPassword("dbz") + .withConnectorClass(MySqlConnector.class) + .withHostName(host) + .withPort(port) + .withConnectionProperty("database.server.id", "184054") + .withConnectionProperty("database.server.name", "dbserver1") + .withConnectionProperty("database.include.list", "inventory") + .withConnectionProperty("include.schema.changes", "false")) + .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()) + .withMaxNumberOfRecords(30) + .withCoder(StringUtf8Coder.of())); + String expected = + "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\"," + + "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null," + + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\"," + + "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001," + + "\"type\":\"SHIPPING\"}}}"; + + PAssert.that(results) + .satisfies( + (Iterable res) -> { + assertThat(res, hasItem(expected)); + return null; + }); + + p.run().waitUntilFinish(); + MY_SQL_CONTAINER.stop(); + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java new file mode 100644 index 0000000000000..ccf57b6cdec9f --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.MySqlConnector; +import io.debezium.connector.mysql.MySqlConnectorConfig; +import java.io.Serializable; +import java.util.Map; +import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration; +import org.apache.kafka.common.config.ConfigValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Test on the DebeziumIO. */ +@RunWith(JUnit4.class) +public class DebeziumIOTest implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DebeziumIOTest.class); + private static final ConnectorConfiguration MYSQL_CONNECTOR_CONFIGURATION = + ConnectorConfiguration.create() + .withUsername("debezium") + .withPassword("dbz") + .withHostName("127.0.0.1") + .withPort("3306") + .withConnectorClass(MySqlConnector.class) + .withConnectionProperty("database.server.id", "184054") + .withConnectionProperty("database.server.name", "dbserver1") + .withConnectionProperty("database.include.list", "inventory") + .withConnectionProperty( + "database.history", KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()) + .withConnectionProperty("include.schema.changes", "false"); + + @Test + public void testSourceMySqlConnectorValidConfiguration() { + Map configurationMap = MYSQL_CONNECTOR_CONFIGURATION.getConfigurationMap(); + + Configuration debeziumConf = Configuration.from(configurationMap); + Map validConfig = debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS); + + for (ConfigValue configValue : validConfig.values()) { + assertTrue(configValue.errorMessages().isEmpty()); + } + } + + @Test + public void testSourceConnectorUsernamePassword() { + String username = "debezium"; + String password = "dbz"; + ConnectorConfiguration configuration = + MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password); + Map configurationMap = configuration.getConfigurationMap(); + + Configuration debeziumConf = Configuration.from(configurationMap); + Map validConfig = debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS); + + for (ConfigValue configValue : validConfig.values()) { + assertTrue(configValue.errorMessages().isEmpty()); + } + } + + @Test + public void testSourceConnectorNullPassword() { + String username = "debezium"; + String password = null; + + assertThrows( + IllegalArgumentException.class, + () -> MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password)); + } + + @Test + public void testSourceConnectorNullUsernameAndPassword() { + String username = null; + String password = null; + + assertThrows( + IllegalArgumentException.class, + () -> MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password)); + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java new file mode 100644 index 0000000000000..c22f8a33dd2bf --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class KafkaSourceConsumerFnTest implements Serializable { + @Test + public void testKafkaSourceConsumerFn() { + Map config = + ImmutableMap.of( + "from", "1", + "to", "10", + "delay", "0.4", + "topic", "any"); + + Pipeline pipeline = Pipeline.create(); + + PCollection counts = + pipeline + .apply( + Create.of(Lists.newArrayList(config)) + .withCoder(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply( + ParDo.of( + new KafkaSourceConsumerFn<>( + CounterSourceConnector.class, + sourceRecord -> (Integer) sourceRecord.value(), + 10))) + .setCoder(VarIntCoder.of()); + + PAssert.that(counts).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testStoppableKafkaSourceConsumerFn() { + Map config = + ImmutableMap.of( + "from", "1", + "to", "3", + "delay", "0.2", + "topic", "any"); + + Pipeline pipeline = Pipeline.create(); + + PCollection counts = + pipeline + .apply( + Create.of(Lists.newArrayList(config)) + .withCoder(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply( + ParDo.of( + new KafkaSourceConsumerFn<>( + CounterSourceConnector.class, + sourceRecord -> (Integer) sourceRecord.value(), + 1))) + .setCoder(VarIntCoder.of()); + + pipeline.run().waitUntilFinish(); + Assert.assertEquals(3, CounterTask.getCountTasks()); + } +} + +class CounterSourceConnector extends SourceConnector { + public static class CounterSourceConnectorConfig extends AbstractConfig { + final Map props; + + CounterSourceConnectorConfig(Map props) { + super(configDef(), props); + this.props = props; + } + + protected static ConfigDef configDef() { + return new ConfigDef() + .define("from", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number to start from") + .define("to", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number to go to") + .define( + "delay", ConfigDef.Type.DOUBLE, ConfigDef.Importance.HIGH, "Time between each event") + .define( + "topic", + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Name of Kafka topic to produce to"); + } + } + + @Nullable private CounterSourceConnectorConfig connectorConfig; + + @Override + public void start(Map props) { + this.connectorConfig = new CounterSourceConnectorConfig(props); + } + + @Override + public Class taskClass() { + return CounterTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + if (this.connectorConfig == null || this.connectorConfig.props == null) { + return Collections.emptyList(); + } + + return Collections.singletonList( + ImmutableMap.of( + "from", this.connectorConfig.props.get("from"), + "to", this.connectorConfig.props.get("to"), + "delay", this.connectorConfig.props.get("delay"), + "topic", this.connectorConfig.props.get("topic"))); + } + + @Override + public void stop() {} + + @Override + public ConfigDef config() { + return CounterSourceConnectorConfig.configDef(); + } + + @Override + public String version() { + return "ONE"; + } +} + +class CounterTask extends SourceTask { + private static int countStopTasks = 0; + private String topic = ""; + private Integer from = 0; + private Integer to = 0; + private Double delay = 0.0; + + private Long start = System.currentTimeMillis(); + private Integer last = 0; + private Object lastOffset = null; + + private static final String PARTITION_FIELD = "mod"; + private static final Integer PARTITION_NAME = 1; + + @Override + public String version() { + return "ONE"; + } + + @Override + public void initialize(SourceTaskContext context) { + super.initialize(context); + + Map offset = + context + .offsetStorageReader() + .offset(Collections.singletonMap(PARTITION_FIELD, PARTITION_NAME)); + + if (offset == null) { + this.start = System.currentTimeMillis(); + this.last = 0; + } else { + this.start = (Long) offset.get("start"); + this.last = ((Long) offset.getOrDefault("last", 0)).intValue(); + } + this.lastOffset = offset; + } + + @Override + public void start(Map props) { + this.topic = props.getOrDefault("topic", ""); + this.from = Integer.parseInt(props.getOrDefault("from", "0")); + this.to = Integer.parseInt(props.getOrDefault("to", "0")); + this.delay = Double.parseDouble(props.getOrDefault("delay", "0")); + + if (this.lastOffset != null) { + return; + } + + this.start = + props.containsKey("start") + ? Long.parseLong(props.get("start")) + : System.currentTimeMillis(); + this.last = this.from - 1; + } + + @Override + public List poll() throws InterruptedException { + if (this.last.equals(to)) { + return null; + } + + List records = new ArrayList<>(); + Long callTime = System.currentTimeMillis(); + Long secondsSinceStart = (callTime - this.start) / 1000; + Long recordsToOutput = Math.round(Math.floor(secondsSinceStart / this.delay)); + + while (this.last < this.to) { + this.last = this.last + 1; + Map sourcePartition = Collections.singletonMap(PARTITION_FIELD, 1); + Map sourceOffset = + ImmutableMap.of("last", this.last.longValue(), "start", this.start); + + records.add( + new SourceRecord( + sourcePartition, sourceOffset, this.topic, Schema.INT64_SCHEMA, this.last)); + + if (records.size() >= recordsToOutput) { + break; + } + } + + return records; + } + + @Override + public void stop() { + CounterTask.countStopTasks++; + } + + public static int getCountTasks() { + return CounterTask.countStopTasks; + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java new file mode 100644 index 0000000000000..b8a6c9b031e3a --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.debezium.connector.mysql.MySqlConnector; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OffsetTrackerTest implements Serializable { + @Test + public void testRestrictByNumberOfRecords() throws IOException { + Integer maxNumRecords = 10; + Map position = new HashMap<>(); + KafkaSourceConsumerFn kafkaSourceConsumerFn = + new KafkaSourceConsumerFn( + MySqlConnector.class, new SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords); + KafkaSourceConsumerFn.OffsetHolder restriction = + kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>()); + KafkaSourceConsumerFn.OffsetTracker tracker = + new KafkaSourceConsumerFn.OffsetTracker(restriction); + + for (int records = 0; records < maxNumRecords; records++) { + assertTrue("OffsetTracker should continue", tracker.tryClaim(position)); + } + assertFalse("OffsetTracker should stop", tracker.tryClaim(position)); + } + + @Test + public void testRestrictByAmountOfTime() throws IOException, InterruptedException { + long millis = 60 * 1000; + long minutesToRun = 1; + Map position = new HashMap<>(); + KafkaSourceConsumerFn kafkaSourceConsumerFn = + new KafkaSourceConsumerFn( + MySqlConnector.class, new SourceRecordJson.SourceRecordJsonMapper(), minutesToRun); + KafkaSourceConsumerFn.OffsetHolder restriction = + kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>()); + KafkaSourceConsumerFn.OffsetTracker tracker = + new KafkaSourceConsumerFn.OffsetTracker(restriction); + + assertTrue(tracker.tryClaim(position)); + + Thread.sleep(minutesToRun * millis + 100); + + assertFalse(tracker.tryClaim(position)); + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java new file mode 100644 index 0000000000000..badd01eee292e --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.Serializable; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SourceRecordJsonTest implements Serializable { + @Test + public void testSourceRecordJson() { + SourceRecord record = this.buildSourceRecord(); + SourceRecordJson json = new SourceRecordJson(record); + + String jsonString = json.toJson(); + + String expectedJson = + "{\"metadata\":" + + "{\"connector\":\"test-connector\"," + + "\"version\":\"version-connector\"," + + "\"name\":\"test-connector-sql\"," + + "\"database\":\"test-db\"," + + "\"schema\":\"test-schema\"," + + "\"table\":\"test-table\"}," + + "\"before\":{\"fields\":{\"column1\":\"before-name\"}}," + + "\"after\":{\"fields\":{\"column1\":\"after-name\"}}}"; + + assertEquals(expectedJson, jsonString); + } + + @Test + public void testSourceRecordJsonWhenSourceRecordIsNull() { + assertThrows(IllegalArgumentException.class, () -> new SourceRecordJson(null)); + } + + private Schema buildSourceSchema() { + return SchemaBuilder.struct() + .field("connector", Schema.STRING_SCHEMA) + .field("version", Schema.STRING_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("db", Schema.STRING_SCHEMA) + .field("schema", Schema.STRING_SCHEMA) + .field("table", Schema.STRING_SCHEMA) + .build(); + } + + private Schema buildBeforeSchema() { + return SchemaBuilder.struct().field("column1", Schema.STRING_SCHEMA).build(); + } + + private Schema buildAfterSchema() { + return SchemaBuilder.struct().field("column1", Schema.STRING_SCHEMA).build(); + } + + private SourceRecord buildSourceRecord() { + final Schema sourceSchema = this.buildSourceSchema(); + final Schema beforeSchema = this.buildBeforeSchema(); + final Schema afterSchema = this.buildAfterSchema(); + + final Schema schema = + SchemaBuilder.struct() + .name("test") + .field("source", sourceSchema) + .field("before", beforeSchema) + .field("after", afterSchema) + .build(); + + final Struct source = new Struct(sourceSchema); + final Struct before = new Struct(beforeSchema); + final Struct after = new Struct(afterSchema); + final Struct value = new Struct(schema); + + source.put("connector", "test-connector"); + source.put("version", "version-connector"); + source.put("name", "test-connector-sql"); + source.put("db", "test-db"); + source.put("schema", "test-schema"); + source.put("table", "test-table"); + + before.put("column1", "before-name"); + after.put("column1", "after-name"); + + value.put("source", source); + value.put("before", before); + value.put("after", after); + + return new SourceRecord(null, null, null, schema, value); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index e6f037e47a672..c01df186cb81b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -140,6 +140,7 @@ include(":sdks:java:io:cassandra") include(":sdks:java:io:clickhouse") include(":sdks:java:io:common") include(":sdks:java:io:contextualtextio") +include(":sdks:java:io:debezium") include(":sdks:java:io:elasticsearch") include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-2") include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-5")