Skip to content

Commit

Permalink
Beam transform that uses DebeziumIO connector to support CDC
Browse files Browse the repository at this point in the history
Debeziumio PoC (#7)

* New DebeziumIO class.

* Merge connector code

* DebeziumIO and MySqlConnector integrated.

* Added FormatFuntion param to Read builder on DebeziumIO.

* Added arguments checker to DebeziumIO.

* Add simple JSON mapper object (#1)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>

* Add debeziumio tests

* Debeziumio testing json mapper (#3)

* Some code refactors. Use a default DBHistory if not provided

* Add basic tests for Json mapper

* Debeziumio time restriction (#5)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

* Some code refactors. Use a default DBHistory if not provided

* Adding based-time restriction

Stop polling after specified amount of time

* Add basic tests for Json mapper

* Adding new restriction

Uses a time-based restriction

* Adding optional restrcition

Uses an optional time-based restriction

Co-authored-by: juanitodread <juanitodread@gmail.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>

* Upgrade DebeziumIO connector (#4)

* Address comments (Change dependencies to testCompile, Set JsonMapper/Coder as default, refactors) (#8)

* Revert file

* Change dependencies to testCompile
* Move Counter sample to unit test

* Set JsonMapper as default mapper function
* Set String Coder as default coder when using JsonMapper
* Change logs from info to debug

* Debeziumio javadoc (#9)

* Adding javadoc

* Added some titles and examples

* Added SourceRecordJson doc

* Added Basic Connector doc

* Added KafkaSourceConsumer doc

* Javadoc cleanup

* Removing BasicConnector

No usages of this class were found overall

* Editing documentation

* Debeziumio fetched records restriction (#10)

* Adding javadoc

* Adding restriction by number of fetched records

Also adding a quick-fix for null value within SourceRecords
Minor fix on both MySQL and PostgreSQL Connectors Tests

* Run either by time or by number of records

* Added DebeziumOffsetTrackerTest

Tests both restrictions: By amount of time and by Number of records

* Removing comment

* DebeziumIO test for DB2. (#11)

* DebeziumIO test for DB2.

* DebeziumIO javadoc.

* Clean code:removed commented code lines on DebeziumIOConnectorTest.java

* Clean code:removing unused imports and using readAsJson().

Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>

* Debezium limit records (now configurable) (#12)

* Adding javadoc

* Records Limit is now configurable

(It was fixed before)

* Debeziumio dockerize (#13)

* Add mysql docker container to tests

* Move debezium mysql integration test to its own file

* Add assertion to verify that the results contains a record.

* Debeziumio readme (#15)

* Adding javadoc

* Adding README file

* Add number of records configuration to the DebeziumIO component (#16)

* Code refactors (#17)

* Remove/ignore null warnings

* Remove DB2 code

* Remove docker dependency in DebeziumIO unit test and max number of recods to MySql integration test

* Change access modifiers accordingly

* Remove incomplete integration tests (Postgres and SqlServer)

* Add experimenal tag

* Debezium testing stoppable consumer (#18)

* Add try-catch-finally, stop SourceTask at finally.

* Fix warnings

* stopConsumer and processedRecords local variables removed. UT for task stop use case added

* Fix minor code style issue

Co-authored-by: juanitodread <juanitodread@gmail.com>

* Fix style issues (check, spotlessApply) (#19)

Co-authored-by: Osvaldo Salinas <osvaldo.salinas@osvaldo.salinas>
Co-authored-by: alejandro.maguey <alejandro.maguey@wizeline.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>
Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>
Co-authored-by: Alejandro Maguey <alexmaguey1@gmail.com>
Co-authored-by: Hassan Reyes <hassanreyes@users.noreply.github.com>

Add missing apache license to README.md

Enabling integration test for DebeziumIO (#20)

Rename connector package cdc=>debezium. Update doc references (#21)

Fix code style on DebeziumIOMySqlConnectorIT
  • Loading branch information
juanitodread committed Feb 17, 2021
1 parent edeaf67 commit ac5d258
Show file tree
Hide file tree
Showing 14 changed files with 2,175 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ task("javaPostCommit") {
dependsOn(":runners:google-cloud-dataflow-java:postCommitRunnerV2")
dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
dependsOn(":sdks:java:extensions:zetasketch:postCommit")
dependsOn(":sdks:java:io:debezium:integrationTest")
dependsOn(":sdks:java:io:google-cloud-platform:postCommit")
dependsOn(":sdks:java:io:kinesis:integrationTest")
dependsOn(":sdks:java:extensions:ml:postCommit")
Expand Down
83 changes: 83 additions & 0 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.debezium',
mavenRepositories: [
[id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
],
checkerTooSlowOnTests: true,
enableSpotbugs: false,
)
provideIntegrationTestingDependencies()

description = "Apache Beam :: SDKs :: Java :: IO :: Debezium"
ext.summary = "Library to work with Debezium data."

dependencies {
compile library.java.vendored_guava_26_0_jre
compile project(path: ":sdks:java:core", configuration: "shadow")
compile library.java.slf4j_api
compile library.java.joda_time
provided library.java.jackson_dataformat_csv
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")

// Test dependencies
testCompile library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(":runners:google-cloud-dataflow-java")
testCompile "org.testcontainers:testcontainers:1.15.1"
testCompile "org.testcontainers:mysql:1.15.1"

// Kafka connect dependencies
compile "org.apache.kafka:connect-api:2.5.0"
compile "org.apache.kafka:connect-json:2.5.0"

// Debezium dependencies
compile group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
testCompile group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final'
}

test {
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
}
}


task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
outputs.upToDateWhen { false }

include '**/*IT.class'
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs

useJUnit {
}
}
178 changes: 178 additions & 0 deletions sdks/java/io/debezium/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DebeziumIO
## Connect your Debezium Databases to Apache Beam easily.

### What is DebeziumIO?
DebeziumIO is an Apache Beam connector that lets users connect their Events-Driven Databases on [Debezium](https://debezium.io) to [Apache Beam](https://beam.apache.org/) without the need to set up a [Kafka](https://kafka.apache.org/) instance.

### Getting Started

DebeziumIO uses [Debezium Connectors v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines.

These connectors have been successfully tested and are known to work fine:
* MySQL Connector
* PostgreSQL Connector
* SQLServer Connector
* DB2 Connector

Other connectors might also work.


Setting up a connector and running a Pipeline should be as simple as:
```
Pipeline p = Pipeline.create(); // Create a Pipeline
p.apply(DebeziumIO.<String>read()
.withConnectorConfiguration(...) // Debezium Connector setup
.withFormatFunction(...) // Serializable Function to use
).setCoder(StringUtf8Coder.of());
p.run().waitUntilFinish(); // Run your pipeline!
```

### Setting up a Debezium Connector

DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you provide all the configuration needed to access your Debezium Database.

A basic configuration such as **username**, **password**, **port number**, and **host name** must be specified along with the **Debezium Connector class** you will use by using these methods:

|Method|Param|Description|
|-|-|-|
|`.withConnectorClass(connectorClass)`|_Class_|Debezium Connector|
|`.withUsername(username)`|_String_|Database Username|
|`.withPassword(password)`|_String_|Database Password|
|`.withHostName(hostname)`|_String_|Database Hostname|
|`.withPort(portNumber)`|_String_|Database Port number|

You can also add more configuration, such as Connector-specific Properties with the `_withConnectionProperty_` method:

|Method|Params|Description|
|-|-|-|
|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a custom property to the connector.|
> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/1.3/connectors/) specific documentation.
Example of a MySQL Debezium Connector setup:
```
DebeziumIO.ConnectorConfiguration.create()
.withUsername("dbUsername")
.withPassword("dbPassword")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "serverId")
.withConnectionProperty("database.server.name", "serverName")
.withConnectionProperty("database.include.list", "dbName")
.withConnectionProperty("include.schema.changes", "false")
```

### Setting a Serializable Function

A serializable function is required to depict each `SourceRecord` fetched from the Database.

DebeziumIO comes with a built-in JSON Mapper that you can optionally use to map every `SourceRecord` fetched from the Database to a JSON object. This helps users visualize and access their data in a simple way.

If you want to use this built-in JSON Mapper, you can do it by setting an instance of **SourceRecordJsonMapper** as a Serializable Function to the DebeziumIO:
```
.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
```
> **Note:** `SourceRecordJsonMapper`comes out of the box, but you may use any Format Function you prefer.
## Quick Example

The following example is how an actual setup would look like using a **MySQL Debezium Connector** and **SourceRecordJsonMapper** as the Serializable Function.
```
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
withConnectorConfiguration( // Debezium Connector setup
DebeziumIO.ConnectorConfiguration.create()
.withUsername("debezium")
.withPassword("dbz")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "184054")
.withConnectionProperty("database.server.name", "dbserver1")
.withConnectionProperty("database.include.list", "inventory")
.withConnectionProperty("include.schema.changes", "false")
).withFormatFunction(
new SourceRecordJson.SourceRecordJsonMapper() // Serializable Function
)
).setCoder(StringUtf8Coder.of());
p.run().waitUntilFinish();
```

## Shortcut!

If you will be using the built-in **SourceRecordJsonMapper** as your Serializable Function for all your pipelines, you should use **readAsJson()**.

DebeziumIO comes with a method called `readAsJson`, which automatically sets the `SourceRecordJsonMapper` as the Serializable Function for your pipeline. This way, you would need to setup your connector before running your pipeline, without explicitly setting a Serializable Function.

Example of using **readAsJson**:
```
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
withConnectorConfiguration( // Debezium Connector setup
DebeziumIO.ConnectorConfiguration.create()
.withUsername("debezium")
.withPassword("dbz")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "184054")
.withConnectionProperty("database.server.name", "dbserver1")
.withConnectionProperty("database.include.list", "inventory")
.withConnectionProperty("include.schema.changes", "false"));
p.run().waitUntilFinish();
```

## Under the hood

### KafkaSourceConsumerFn and Restrictions

KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC.

There are two ways of initializing KSC:
* Restricted by number of records
* Restricted by amount of time (minutes)

By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter:

|Function|Param|Description|
|-|-|-|
|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, SourceRecordMapper, Int_|Restrict run by number of records (Default).|
|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).|

### Requirements and Supported versions

- JDK v8
- Debezium Connectors v1.3
- Apache Beam 2.25

## Running Unit Tests

You can run Integration Tests using **gradlew**.

Example of running the MySQL Connector Integration Test:
```
./gradlew integrationTest -p sdks/java/io/debezium/ --tests org.apache.beam.io.debezium.DebeziumIOMySqlConnectorIT -DintegrationTestRunner=direct
```
Loading

0 comments on commit ac5d258

Please sign in to comment.