diff --git a/build.gradle b/build.gradle index b215f2e0d20..792f7da735b 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ ext { commonsIoVersion = '2.11.0' commonsNetVersion = '3.9.0' curatorVersion = '5.5.0' - debeziumVersion = '2.2.0.Final' + debeziumVersion = '2.2.1.Final' derbyVersion = '10.16.1.1' findbugsVersion = '3.0.1' ftpServerVersion = '1.2.0' diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/Debezium.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/Debezium.java new file mode 100644 index 00000000000..8c506b3a97a --- /dev/null +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/Debezium.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.debezium.dsl; + +import java.util.Properties; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.JsonByteArray; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; +import io.debezium.engine.format.SerializationFormat; + +import org.springframework.util.Assert; + +/** + * Factory class for Debezium DSL components. + * + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 6.2 + */ +public final class Debezium { + + /** + * Create an instance of {@link DebeziumMessageProducerSpec} for the provided native debezium {@link Properties} and + * JSON serialization formats. + * @param debeziumConfig {@link Properties} with required debezium engine and connector properties. + * @return the spec. + */ + public static DebeziumMessageProducerSpec inboundChannelAdapter(Properties debeziumConfig) { + return inboundChannelAdapter(debeziumConfig, JsonByteArray.class, JsonByteArray.class); + } + + /** + * Create an instance of {@link DebeziumMessageProducerSpec} for the provided native debezium {@link Properties} and + * serialization formats. + * @param debeziumConfig {@link Properties} with required debezium engine and connector properties. + * @param messageFormat {@link SerializationFormat} format for the {@link ChangeEvent} key and payload. + * @param headerFormat {@link SerializationFormat} format for the {@link ChangeEvent} headers. + * @return the spec. + */ + public static DebeziumMessageProducerSpec inboundChannelAdapter(Properties debeziumConfig, + Class> messageFormat, + Class> headerFormat) { + + return inboundChannelAdapter(builder(debeziumConfig, messageFormat, headerFormat)); + } + + /** + * Create an instance of {@link DebeziumMessageProducerSpec} for the provided {@link DebeziumEngine.Builder}. + * @param debeziumEngineBuilder the {@link DebeziumEngine.Builder} to use. + * @return the spec. + */ + public static DebeziumMessageProducerSpec inboundChannelAdapter( + DebeziumEngine.Builder> debeziumEngineBuilder) { + + return new DebeziumMessageProducerSpec(debeziumEngineBuilder); + } + + private static DebeziumEngine.Builder> builder(Properties debeziumConfig, + Class> messageFormat, + Class> headerFormat) { + + Assert.notNull(messageFormat, "'messageFormat' must not be null"); + Assert.notNull(headerFormat, "'headerFormat' must not be null"); + Assert.notNull(debeziumConfig, "'debeziumConfig' must not be null"); + Assert.isTrue(debeziumConfig.containsKey("connector.class"), "The 'connector.class' property must be set"); + + return DebeziumEngine + .create(KeyValueHeaderChangeEventFormat.of(messageFormat, messageFormat, headerFormat)) + .using(debeziumConfig); + } + + private Debezium() { + } + +} diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java new file mode 100644 index 00000000000..d01e4af589b --- /dev/null +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java @@ -0,0 +1,127 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.debezium.dsl; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadFactory; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.Header; +import io.debezium.engine.format.SerializationFormat; + +import org.springframework.integration.debezium.inbound.DebeziumMessageProducer; +import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper; +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.messaging.support.HeaderMapper; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; + +/** + * A {@link org.springframework.integration.dsl.MessageProducerSpec} for {@link DebeziumMessageProducer}. + * + * @author Christian Tzolov + * + * @since 6.2 + */ +public class DebeziumMessageProducerSpec + extends MessageProducerSpec { + + protected DebeziumMessageProducerSpec(DebeziumEngine.Builder> debeziumEngineBuilder) { + super(new DebeziumMessageProducer(debeziumEngineBuilder)); + } + + /** + * Enable the {@link ChangeEvent} batch mode handling. When enabled the channel adapter will send a {@link List} of + * {@link ChangeEvent}s as a payload in a single downstream {@link org.springframework.messaging.Message}. + * Such a batch payload is not serializable. + * By default, the batch mode is disabled, e.g. every input {@link ChangeEvent} is converted into a + * single downstream {@link org.springframework.messaging.Message}. + * @param enable set to true to enable the batch mode. Disabled by default. + * @return the spec. + */ + public DebeziumMessageProducerSpec enableBatch(boolean enable) { + this.target.setEnableBatch(enable); + return this; + } + + /** + * Enable support for tombstone (aka delete) messages. On a database row delete, Debezium can send a tombstone + * change event that has the same key as the deleted row and a value of {@link Optional#empty()}. This record is a + * marker for downstream processors. It indicates that log compaction can remove all records that have this key. + * When the tombstone functionality is enabled in the Debezium connector configuration you should enable the empty + * payload as well. + * @param enabled set true to enable the empty payload. Disabled by default. + * @return the spec. + */ + public DebeziumMessageProducerSpec enableEmptyPayload(boolean enabled) { + this.target.setEnableEmptyPayload(enabled); + return this; + } + + /** + * Set a {@link ThreadFactory} for the Debezium executor. Defaults to the {@link CustomizableThreadFactory} with a + * {@code debezium:inbound-channel-adapter-thread-} prefix. + * @param threadFactory the {@link ThreadFactory} instance to use. + * @return the spec. + */ + public DebeziumMessageProducerSpec threadFactory(ThreadFactory threadFactory) { + this.target.setThreadFactory(threadFactory); + return this; + } + + /** + * Set the outbound message content type. Must be aligned with the {@link SerializationFormat} configuration used by + * the provided {@link DebeziumEngine}. + * @param contentType payload content type. + * @return the spec. + */ + public DebeziumMessageProducerSpec contentType(String contentType) { + this.target.setContentType(contentType); + return this; + } + + /** + * Comma-separated list of names of {@link ChangeEvent} headers to be mapped into outbound Message headers. + * Debezium's NewRecordStateExtraction 'add.headers' property configures the metadata to be used as + * {@link ChangeEvent} headers. + *

+ * You should prefix the names passed to the 'headerNames' with the prefix configured by the Debezium + * 'add.headers.prefix' property. Later defaults to '__'. For example for 'add.headers=op,name' and + * 'add.headers.prefix=__' you should use header hames like: '__op', '__name'. + * @param headerNames The values in this list can be a simple patterns to be matched against the header names. + * @return the spec. + */ + public DebeziumMessageProducerSpec headerNames(String... headerNames) { + DefaultDebeziumHeaderMapper headerMapper = new DefaultDebeziumHeaderMapper(); + headerMapper.setHeaderNamesToMap(headerNames); + + return headerMapper(headerMapper); + } + + /** + * Set a {@link HeaderMapper} to convert the {@link ChangeEvent} headers + * into {@link org.springframework.messaging.Message} headers. + * @param headerMapper {@link HeaderMapper} implementation to use. Defaults to {@link DefaultDebeziumHeaderMapper}. + * @return the spec. + */ + public DebeziumMessageProducerSpec headerMapper(HeaderMapper>> headerMapper) { + this.target.setHeaderMapper(headerMapper); + return this; + } + +} diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/package-info.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/package-info.java new file mode 100644 index 00000000000..3df2a6ea52d --- /dev/null +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/package-info.java @@ -0,0 +1,6 @@ +/** + * Provides classes for supporting Debezium component via Java DSL. + */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.integration.debezium.dsl; diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DebeziumHeaders.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DebeziumHeaders.java index dfc1a7f784a..19d8ddec14e 100644 --- a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DebeziumHeaders.java +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DebeziumHeaders.java @@ -20,6 +20,7 @@ * Pre-defined header names to be used when retrieving Debezium Change Event headers. * * @author Christian Tzolov + * * @since 6.2 */ public abstract class DebeziumHeaders { diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapper.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapper.java index d45cc472382..dbf9760b710 100644 --- a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapper.java +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapper.java @@ -39,7 +39,7 @@ */ public class DefaultDebeziumHeaderMapper implements HeaderMapper>> { - private String[] headerNamesToMap = new String[0]; + private String[] headerNamesToMap = {"*"}; /** * Comma-separated list of names of Debezium's Change Event headers to be mapped to the outbound Message headers. diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/DebeziumMySqlTestContainer.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/DebeziumMySqlTestContainer.java new file mode 100644 index 00000000000..528ccc5010e --- /dev/null +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/DebeziumMySqlTestContainer.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.debezium; + +import java.util.Properties; +import java.util.Random; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 6.2 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface DebeziumMySqlTestContainer { + + int EXPECTED_DB_TX_COUNT = 52; + + @Container + GenericContainer DEBEZIUM_MYSQL = new GenericContainer<>("debezium/example-mysql:2.2.0.Final") + .withExposedPorts(3306) + .withEnv("MYSQL_ROOT_PASSWORD", "debezium") + .withEnv("MYSQL_USER", "mysqluser") + .withEnv("MYSQL_PASSWORD", "mysqlpw") + .waitingFor(new LogMessageWaitStrategy().withRegEx(".*port: 3306 MySQL Community Server - GPL.*.")); + + static int mysqlPort() { + return DEBEZIUM_MYSQL.getMappedPort(3306); + } + + static Properties connectorConfig(int port) { + Random random = new Random(); + + Properties config = new Properties(); + + config.put("transforms", "unwrap"); + config.put("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); + config.put("transforms.unwrap.drop.tombstones", "false"); + config.put("transforms.unwrap.delete.handling.mode", "rewrite"); + config.put("transforms.unwrap.add.fields", "name,db,op,table"); + config.put("transforms.unwrap.add.headers", "name,db,op,table"); + + config.put("schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); + config.put("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); + + config.put("name", "my-connector-" + random.nextInt(10)); + + // Topic prefix for the database server or cluster. + config.put("topic.prefix", "my-topic-" + random.nextInt(10)); + // Unique ID of the connector. + config.put("database.server.id", "8574" + random.nextInt(10)); + + config.put("key.converter.schemas.enable", "false"); + config.put("value.converter.schemas.enable", "false"); + + config.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); + config.put("database.user", "debezium"); + config.put("database.password", "dbz"); + config.put("database.hostname", "localhost"); + config.put("database.port", String.valueOf(port)); + + return config; + } + +} diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/dsl/DebeziumDslTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/dsl/DebeziumDslTests.java new file mode 100644 index 00000000000..9a59606581c --- /dev/null +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/dsl/DebeziumDslTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.debezium.dsl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.Header; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.debezium.DebeziumMySqlTestContainer; +import org.springframework.integration.debezium.support.DebeziumHeaders; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.CollectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 6.2 + */ +@SpringJUnitConfig +@DirtiesContext +public class DebeziumDslTests implements DebeziumMySqlTestContainer { + + static final LogAccessor logger = new LogAccessor(DebeziumDslTests.class); + + @Autowired + private Config config; + + @Test + void dslFromBuilder() throws InterruptedException { + assertThat(config.latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(config.payloads).hasSize(EXPECTED_DB_TX_COUNT); + assertThat(config.headerKeys).hasSize(EXPECTED_DB_TX_COUNT); + + config.headerKeys.forEach(keys -> { + assertThat(keys).contains("debezium_destination", "id", "contentType", "debezium_key", "timestamp"); + if (keys.size() > 5) { + assertThat(keys).contains("__name", "__db", "__table", "__op"); + } + }); + } + + @Test + void dslBatch() throws InterruptedException { + assertThat(config.batchLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(config.bachPayloads) + .as("Sum of the message payload counts should correspond to the number of DB transactions") + .hasSize(EXPECTED_DB_TX_COUNT); + assertThat(config.batchHeaderKeys).hasSize(EXPECTED_DB_TX_COUNT); + assertThat(config.batchMessageCount) + .as("Batch mode: message count should be less than the sum of the payloads counts") + .isLessThan(EXPECTED_DB_TX_COUNT); + + config.batchHeaderKeys.stream() + .filter(headerNames -> !CollectionUtils.isEmpty(headerNames)) + .forEach(headerNames -> { + assertThat(headerNames).contains("__name", "__db", "__table", "__op"); + }); + } + + @Configuration + @EnableIntegration + public static class Config { + + private final CountDownLatch latch = new CountDownLatch(52); + private final List payloads = new ArrayList<>(); + private final List> headerKeys = new ArrayList<>(); + + private final CountDownLatch batchLatch = new CountDownLatch(52); + private final List bachPayloads = new ArrayList<>(); + private final List> batchHeaderKeys = new ArrayList<>(); + private int batchMessageCount = 0; + + @Bean + public IntegrationFlow streamFlowFromBuilder(DebeziumEngine.Builder> builder) { + + DebeziumMessageProducerSpec dsl = Debezium.inboundChannelAdapter(builder) + .headerNames("*") + .contentType("application/json") + .enableBatch(false) + .enableEmptyPayload(true); + + return IntegrationFlow.from(dsl) + .handle(m -> { + Object key = new String((byte[]) m.getHeaders().get(DebeziumHeaders.KEY)); + Object destination = m.getHeaders().get(DebeziumHeaders.DESTINATION); + logger.info("KEY: " + key + ", DESTINATION: " + destination); + + headerKeys.add(m.getHeaders().keySet()); + payloads.add(new String((byte[]) m.getPayload())); + latch.countDown(); + }) + .get(); + } + + @Bean + @SuppressWarnings("unchecked") + public IntegrationFlow batchFlowDirect() { + + DebeziumMessageProducerSpec dsl = Debezium + .inboundChannelAdapter( + DebeziumMySqlTestContainer.connectorConfig(DebeziumMySqlTestContainer.mysqlPort())) + .headerNames("*") + .contentType("application/json") + .enableBatch(true); + + return IntegrationFlow.from(dsl) + .handle(m -> { + batchMessageCount++; + List> batch = (List>) m.getPayload(); + batch.forEach(ce -> { + bachPayloads.add(new String(ce.value())); + batchHeaderKeys + .add(ce.headers().stream().map(Header::getKey).collect(Collectors.toList())); + batchLatch.countDown(); + }); + }).get(); + } + + @Bean + public DebeziumEngine.Builder> debeziumEngineBuilder() { + return DebeziumEngine.create(KeyValueHeaderChangeEventFormat + .of(io.debezium.engine.format.JsonByteArray.class, + io.debezium.engine.format.JsonByteArray.class, + io.debezium.engine.format.JsonByteArray.class)) + .using(DebeziumMySqlTestContainer.connectorConfig(DebeziumMySqlTestContainer.mysqlPort())); + } + } + +} diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java index ded7dc04903..30205866435 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java @@ -38,6 +38,7 @@ /** * @author Christian Tzolov + * * @since 6.2 */ public class DebeziumMessageProducerTests { diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumBatchTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumBatchTests.java index 9601cdeef58..76f8a8b2698 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumBatchTests.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumBatchTests.java @@ -16,6 +16,7 @@ package org.springframework.integration.debezium.it; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -32,6 +33,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.debezium.DebeziumMySqlTestContainer; import org.springframework.integration.debezium.inbound.DebeziumMessageProducer; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -39,6 +41,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; /** * @author Christian Tzolov @@ -50,38 +53,49 @@ @DirtiesContext public class DebeziumBatchTests implements DebeziumMySqlTestContainer { + private final List> allPayload = new ArrayList<>(); + @Autowired @Qualifier("queueChannel") private QueueChannel queueChannel; + private int batchCount = 0; @Test - @SuppressWarnings("unchecked") void batchMode() { - Message message = this.queueChannel.receive(20_000); - assertThat(message).isNotNull(); - List> payload = (List>) message.getPayload(); - assertThat(payload).hasSize(52); + await().until(this::receivePayloads, (count) -> count == EXPECTED_DB_TX_COUNT); + + assertThat(allPayload).hasSize(EXPECTED_DB_TX_COUNT); + assertThat(batchCount).isLessThan(EXPECTED_DB_TX_COUNT); for (int i = 0; i < 52; i++) { - ChangeEvent changeEvent = payload.get(i); + ChangeEvent changeEvent = allPayload.get(i); - List headerKeys = - changeEvent.headers() - .stream() - .map(Header::getKey) - .collect(Collectors.toList()); + List headerKeys = changeEvent.headers() + .stream() + .map(Header::getKey) + .collect(Collectors.toList()); + assertThat(changeEvent.destination()).startsWith("my-topic"); if (i < 16) { - assertThat(changeEvent.destination()).startsWith("my-topic"); assertThat(headerKeys).isEmpty(); } else { - assertThat(changeEvent.destination()).startsWith("my-topic.inventory"); + assertThat(changeEvent.destination()).contains(".inventory"); assertThat(headerKeys).hasSize(4).contains("__name", "__db", "__op", "__table"); } } } + @SuppressWarnings("unchecked") + private int receivePayloads() { + Message message = this.queueChannel.receive(500); + if (message != null) { + allPayload.addAll((List>) message.getPayload()); + batchCount++; + } + return allPayload.size(); + } + @Configuration @EnableIntegration @Import(DebeziumTestConfiguration.class) diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumMySqlTestContainer.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumMySqlTestContainer.java deleted file mode 100644 index 0b81bd1e1f5..00000000000 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumMySqlTestContainer.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2023-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.debezium.it; - -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -/** - * @author Christian Tzolov - * @author Artem Bilan - * - * @since 6.2 - */ -@Testcontainers(disabledWithoutDocker = true) -interface DebeziumMySqlTestContainer { - - @SuppressWarnings("resource") - @Container - GenericContainer DEBEZIUM_MYSQL = - new GenericContainer<>("debezium/example-mysql:2.2.0.Final") - .withExposedPorts(3306) - .withEnv("MYSQL_ROOT_PASSWORD", "debezium") - .withEnv("MYSQL_USER", "mysqluser") - .withEnv("MYSQL_PASSWORD", "mysqlpw"); - - static int mysqlPort() { - return DEBEZIUM_MYSQL.getMappedPort(3306); - } - -} diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumStreamTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumStreamTests.java index cc3bb3616cc..a4839553b58 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumStreamTests.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumStreamTests.java @@ -28,6 +28,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.debezium.DebeziumMySqlTestContainer; import org.springframework.integration.debezium.inbound.DebeziumMessageProducer; import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper; import org.springframework.messaging.Message; @@ -54,7 +55,7 @@ public class DebeziumStreamTests implements DebeziumMySqlTestContainer { @Test void streamMode() { boolean foundDebeziumHeaders = false; - for (int i = 0; i < 52; i++) { + for (int i = 0; i < EXPECTED_DB_TX_COUNT; i++) { Message message = this.queueChannel.receive(10_000); assertThat(message).isNotNull(); diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumTestConfiguration.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumTestConfiguration.java index bc2c4c5170a..a70634eab54 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumTestConfiguration.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/it/DebeziumTestConfiguration.java @@ -16,11 +16,8 @@ package org.springframework.integration.debezium.it; -import java.util.Properties; - import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.JsonByteArray; import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import org.springframework.context.annotation.Bean; @@ -29,6 +26,7 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.debezium.DebeziumMySqlTestContainer; import org.springframework.messaging.MessageChannel; /** @@ -43,36 +41,13 @@ public class DebeziumTestConfiguration { @Bean public DebeziumEngine.Builder> debeziumEngineBuilder() { - Properties config = new Properties(); - - config.put("transforms", "unwrap"); - config.put("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); - config.put("transforms.unwrap.drop.tombstones", "false"); - config.put("transforms.unwrap.delete.handling.mode", "rewrite"); - config.put("transforms.unwrap.add.fields", "name,db,op,table"); - config.put("transforms.unwrap.add.headers", "name,db,op,table"); - - config.put("schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); - config.put("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); - config.put("topic.prefix", "my-topic"); - config.put("name", "my-connector"); - config.put("database.server.id", "85744"); - config.put("database.server.name", "my-app-connector"); - - config.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); - config.put("database.user", "debezium"); - config.put("database.password", "dbz"); - config.put("database.hostname", "localhost"); - config.put("database.port", String.valueOf(DebeziumMySqlTestContainer.mysqlPort())); - - KeyValueHeaderChangeEventFormat format = - KeyValueHeaderChangeEventFormat.of( + return DebeziumEngine.create(KeyValueHeaderChangeEventFormat + .of( io.debezium.engine.format.JsonByteArray.class, io.debezium.engine.format.JsonByteArray.class, - io.debezium.engine.format.JsonByteArray.class); - - return DebeziumEngine.create(format).using(config); + io.debezium.engine.format.JsonByteArray.class)) + .using(DebeziumMySqlTestContainer.connectorConfig(DebeziumMySqlTestContainer.mysqlPort())); } @Bean diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapperTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapperTests.java index 64e2dbee5ec..154fe05f32e 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapperTests.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapperTests.java @@ -47,6 +47,13 @@ public void fromHeaderNotSupported() { @Test public void defaultHeaders() { + assertThat(mapper.toHeaders(debeziumHeaders)).hasSize(4) + .containsKeys("NonStandard1", "NonStandard2", "id", "timestamp"); + } + + @Test + public void disableDebeziumHeaders() { + mapper.setHeaderNamesToMap(""); assertThat(mapper.toHeaders(debeziumHeaders)).hasSize(2) .containsKeys("id", "timestamp"); } diff --git a/src/reference/asciidoc/debezium.adoc b/src/reference/asciidoc/debezium.adoc index a32e9d670a5..ee65f32002c 100644 --- a/src/reference/asciidoc/debezium.adoc +++ b/src/reference/asciidoc/debezium.adoc @@ -1,12 +1,10 @@ [[debezium]] == Debezium Support -Spring Integration provides channel adapter for handling Change Events using Debezium. +https://debezium.io/documentation/reference/development/engine.html[Debezium Engine], Change Data Capture (CDC) inbound channel adapter. +The `DebeziumMessageProducer` allows capturing database change events, converting them into messages and streaming later to the outbound channels. -https://debezium.io/documentation/reference/development/engine.html[Debezium Engine] based Change Data Capture (CDC) channel adapter. -The Debezium adapter allows capturing database change events, converting them into messages and streaming those to the outbound channels. - -You need to include this dependency into your project: +You need to include the spring integration Debezium dependency to your project: ==== [source, xml, subs="normal", role="primary"] @@ -25,10 +23,48 @@ compile "org.springframework.integration:spring-integration-debezium:{project-ve ---- ==== +You also need to include a https://debezium.io/documentation/reference/connectors/index.html[debezium connector] dependency for your input Database. +For example to use Debezium with PostgreSQL you will need the postgres debezium connector: + +==== +[source, xml, subs="normal", role="primary"] +.Maven +---- + + io.debezium + debezium-connector-postgres + ${debezium-version} + + +---- +[source, groovy, subs="normal", role="secondary"] +.Gradle +---- +compile "io.debezium:debezium-connector-postgres:{debezium-version}" +---- +==== + +[NOTE] +==== +Replace the `debezium-version` with the version compatible with the `spring-integration-debezium` version being used. +==== + [[debezium-inbound]] === Inbound Debezium Channel Adapter -The Debezium adapter expects a pre-configured `DebeziumEngine.Builder>` bean instance. +The Debezium adapter expects a pre-configured `DebeziumEngine.Builder>` instance. + +[TIP] +==== +The https://github.com/spring-cloud/stream-applications/tree/main/functions/supplier/debezium-supplier[debezium-supplier] provides an out of the box `DebeziumEngine.Builder` Spring Boot auto-configuration with a handy https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumProperties.java[DebeziumProperties] configuration abstraction. +==== + +[TIP] +==== +The <> can create a `DebeziumMessageProducer` instance from a provided `DebeziumEngine.Builder`, as well as from a plain Debezium configuration (e.g. `java.util.Properties`). +Later can be handy for some common use-cases with opinionated configuration and serialization formats. +==== + Additionally, the `DebeziumMessageProducer` can be tuned with the following configuration properties: - `contentType` - allows handling for `JSON` (default), `AVRO` and `PROTOBUF` message contents. @@ -40,7 +76,8 @@ Such a payload is not serializable and would require a custom serialization/dese On a database row delete, Debezium can send a tombstone change event that has the same key as the deleted row and a value of `Optional.empty`. Defaults to `false`. - `headerMapper` - custom `HeaderMapper` implementation that allows for selecting and converting the `ChangeEvent` headers into `Message` headers. -The default `DefaultDebeziumHeaderMapper` implementation (no headers are mapped) provides a setter for `setHeaderNamesToMap`. +The default `DefaultDebeziumHeaderMapper` implementation provides a setter for `setHeaderNamesToMap`. +By default, all headers are mapped. - `threadFactory` - Set custom `ThreadFactory` for the Debezium executor service. Debezium Engine is designed to be submitted to an `Executor` or `ExecutorService` for execution by single thread. @@ -58,7 +95,7 @@ public class DebeziumJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(DebeziumJavaApplication.class) - .web(false) + .web(WebApplicationType.NONE) .run(args); } @@ -79,12 +116,31 @@ public class DebeziumJavaApplication { } @ServiceActivator(inputChannel = "debeziumInputChannel") - public void handler(String changeEventData) { - System.out.println(changeEventData); + public void handler(Message message) { + + Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); # <1> + + String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); # <2> + + String payload = new String((byte[]) message.getPayload()); # <3> + + System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload); } } ---- +<1> A name of the logical destination for which the event is intended. + Usually the destination is composed of the `topic.prefix` configuration option, the database name and the table name. For example: `my-topic.inventory.orders`. +<2> Contains the schema for the changed table's key and the changed row's actual key. + Both the key schema and its corresponding key payload contain a field for each column in the changed table's `PRIMARY KEY` (or unique constraint) at the time the connector created the event. +<3> Like the key, the payload has a schema section and a payload value section. + The schema section contains the schema that describes the Envelope structure of the payload value section, including its nested fields. + Change events for operations that create, update or delete data all have a value payload with an envelope structure. +==== + +[TIP] +==== +The `key.converter.schemas.enable=false` and/or `value.converter.schemas.enable=false` permit disabling the in-message schema content for key or payload respectively. ==== Similarly, we can configure the `DebeziumMessageProducer` to process the incoming change events in batches: @@ -110,8 +166,36 @@ public void handler(List> payload) { ---- ==== +[[debezium-java-dsl]] +=== Debezium Java DSL Support + +The `spring-integration-debezium` provides a convenient Java DSL fluent API via the `Debezium` factory and the `DebeziumMessageProducerSpec` implementations. -==== Configuring with the Java DSL +The Inbound Channel Adapter for Debezium Java DSL is: +==== +[source, java] +---- + DebeziumEngine.Builder> debeziumEngineBuilder = ... + IntegrationFlow.from( + Debezium.inboundChannelAdapter(debeziumEngineBuilder) + .headerNames("special*") + .contentType("application/json") + .enableBatch(false)) + .handle(m -> System.out.println(new String((byte[]) m.getPayload()))) +---- +==== + +Or create an `DebeziumMessageProducerSpec` instance from native debezium configuration properties and default to `JSON` serialization formats. + +==== +[source, java] +---- + Properties debeziumConfig = ... + IntegrationFlow + .from(Debezium.inboundChannelAdapter(debeziumConfig)) + .handle(m -> System.out.println(new String((byte[]) m.getPayload()))) +---- +==== The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL: @@ -131,7 +215,12 @@ public class DebeziumJavaApplication { public IntegrationFlow debeziumInbound( DebeziumEngine.Builder> debeziumEngineBuilder) { - return IntegrationFlow.from(new DebeziumMessageProducer(debeziumEngineBuilder)) + return IntegrationFlow + .from(Debezium + .inboundChannelAdapter(debeziumEngineBuilder) + .headerNames("special*") + .contentType("application/json") + .enableBatch(false)) .handle(m -> System.out.println(new String((byte[]) m.getPayload()))) .get(); } diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 155708b503b..84149fd0903 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -21,7 +21,7 @@ In general the project has been moved to the latest dependency versions. ==== Debezium Inbound Channel Adapter The Debezium Engine based Change Data Capture (CDC) channel adapter, that allows capturing database change events, converting them into Messages and streaming those to the outbound channels. -See <<./debezium.adoc#debezium-inbound, Debezium Inbound Channel Adapter>> for more information. +See <<./debezium.adoc#debezium, Debezium Support>> for more information. [[x6.2-general]] === General Changes