From 07b322f067dff52c0eca81de108b9bcb29ffdb70 Mon Sep 17 00:00:00 2001 From: bmg13 Date: Thu, 5 Sep 2024 12:00:31 +0100 Subject: [PATCH 1/5] Fix transfer dataplane signaling when transfer process dp_id is null. --- .../edc-controlplane-base/build.gradle.kts | 2 +- .../build.gradle.kts | 29 +++ .../DataPlaneSignalingFlowController.java | 168 ++++++++++++++++++ .../TransferDataPlaneSignalingExtension.java | 78 ++++++++ ...rg.eclipse.edc.spi.system.ServiceExtension | 20 +++ ...ansferDataPlaneSignalingExtensionTest.java | 48 +++++ settings.gradle.kts | 1 + 7 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 edc-extensions/transfer-dataplane-signaling/build.gradle.kts create mode 100644 edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/DataPlaneSignalingFlowController.java create mode 100644 edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtension.java create mode 100644 edc-extensions/transfer-dataplane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtensionTest.java diff --git a/edc-controlplane/edc-controlplane-base/build.gradle.kts b/edc-controlplane/edc-controlplane-base/build.gradle.kts index e17edd8e0..2e06f0194 100644 --- a/edc-controlplane/edc-controlplane-base/build.gradle.kts +++ b/edc-controlplane/edc-controlplane-base/build.gradle.kts @@ -29,9 +29,9 @@ dependencies { runtimeOnly(project(":edc-extensions:edr:edr-api-v2")) runtimeOnly(project(":edc-extensions:edr:edr-callback")) runtimeOnly(project(":edc-extensions:tokenrefresh-handler")) + runtimeOnly(project(":edc-extensions:transfer-dataplane-signaling")) runtimeOnly(libs.edc.core.edrstore) runtimeOnly(libs.edc.edr.store.receiver) - runtimeOnly(libs.edc.dpf.transfer.signaling) runtimeOnly(libs.edc.controlplane.callback.staticendpoint) // needed for BPN validation diff --git a/edc-extensions/transfer-dataplane-signaling/build.gradle.kts b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts new file mode 100644 index 000000000..1bf289e1c --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + + +plugins { + `java-library` +} + +dependencies { + implementation(libs.edc.spi.dataplane.dataplane) + implementation(libs.edc.spi.web) + implementation(libs.edc.spi.dataplane.selector) + implementation(libs.edc.spi.core) + implementation(libs.edc.spi.transfer) + implementation(libs.edc.spi.dataplane.transfer) + + testImplementation(libs.edc.junit) +} diff --git a/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/DataPlaneSignalingFlowController.java b/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/DataPlaneSignalingFlowController.java new file mode 100644 index 000000000..20794b713 --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/DataPlaneSignalingFlowController.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.transfer.signaling; + +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl; +import org.jetbrains.annotations.NotNull; + +import java.util.Collection; +import java.util.Set; +import java.util.UUID; + +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toSet; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; + +/** + * Implementation of {@link DataFlowController} that is compliant with the data plane signaling. + *

+ * It handles all the transfer process where the transferType met the criteria defined in the format mapping of the + * signaling spec + * + * @see Data plane signaling + * @see Data plane signaling transfer type mapping + */ +public class DataPlaneSignalingFlowController implements DataFlowController { + + private final ControlApiUrl callbackUrl; + private final DataPlaneSelectorService selectorClient; + private final DataPlaneClientFactory clientFactory; + private final DataFlowPropertiesProvider propertiesProvider; + private final String selectionStrategy; + private final FlowTypeExtractor flowTypeExtractor; + + public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, + DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, + String selectionStrategy, FlowTypeExtractor flowTypeExtractor) { + this.callbackUrl = callbackUrl; + this.selectorClient = selectorClient; + this.propertiesProvider = propertiesProvider; + this.clientFactory = clientFactory; + this.selectionStrategy = selectionStrategy; + this.flowTypeExtractor = flowTypeExtractor; + } + + @Override + public boolean canHandle(TransferProcess transferProcess) { + return flowTypeExtractor.extract(transferProcess.getTransferType()).succeeded(); + } + + @Override + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { + var flowType = flowTypeExtractor.extract(transferProcess.getTransferType()); + if (flowType.failed()) { + return StatusResult.failure(FATAL_ERROR, flowType.getFailureDetail()); + } + + var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy); + if (propertiesResult.failed()) { + return StatusResult.failure(FATAL_ERROR, propertiesResult.getFailureDetail()); + } + + var selection = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getTransferType(), selectionStrategy); + if (!selection.succeeded()) { + return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail()); + } + + var dataFlowRequest = DataFlowStartMessage.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .processId(transferProcess.getId()) + .sourceDataAddress(transferProcess.getContentDataAddress()) + .destinationDataAddress(transferProcess.getDataDestination()) + .participantId(policy.getAssignee()) + .agreementId(transferProcess.getContractId()) + .assetId(transferProcess.getAssetId()) + .flowType(flowType.getContent()) + .callbackAddress(callbackUrl != null ? callbackUrl.get() : null) + .properties(propertiesResult.getContent()) + .build(); + + var dataPlaneInstance = selection.getContent(); + return clientFactory.createClient(dataPlaneInstance) + .start(dataFlowRequest) + .map(it -> DataFlowResponse.Builder.newInstance() + .dataAddress(it.getDataAddress()) + .dataPlaneId(dataPlaneInstance.getId()) + .build() + ); + } + + @Override + public StatusResult suspend(TransferProcess transferProcess) { + return getClientForDataplane(transferProcess.getDataPlaneId()) + .map(client -> client.suspend(transferProcess.getId())) + .orElse(f -> { + var message = "Failed to select the data plane for suspending the transfer process %s. %s" + .formatted(transferProcess.getId(), f.getFailureDetail()); + return StatusResult.failure(FATAL_ERROR, message); + }); + } + + @Override + public StatusResult terminate(TransferProcess transferProcess) { + var dataPlaneId = transferProcess.getDataPlaneId(); + if (dataPlaneId == null) { + return StatusResult.success(); + } + + return getClientForDataplane(dataPlaneId) + .map(client -> client.terminate(transferProcess.getId())) + .orElse(f -> { + var message = "Failed to select the data plane for terminating the transfer process %s. %s" + .formatted(transferProcess.getId(), f.getFailureDetail()); + return StatusResult.failure(FATAL_ERROR, message); + }); + } + + @Override + public Set transferTypesFor(Asset asset) { + var result = selectorClient.getAll(); + if (result.failed()) { + return emptySet(); + } + + return result.getContent().stream() + .filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType())) + .map(DataPlaneInstance::getAllowedTransferTypes) + .flatMap(Collection::stream) + .collect(toSet()); + } + + private StatusResult getClientForDataplane(String id) { + return selectorClient.findById(id) + .map(clientFactory::createClient) + .map(StatusResult::success) + .orElse(f -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s. %s".formatted(id, f.getFailureDetail()))); + } + +} diff --git a/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtension.java b/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtension.java new file mode 100644 index 000000000..453e7a6aa --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/src/main/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtension.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.transfer.signaling; + +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl; + +import java.util.Map; + +import static org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension.NAME; + +@Extension(NAME) +public class TransferDataPlaneSignalingExtension implements ServiceExtension { + + protected static final String NAME = "Transfer Data Plane Signaling Extension"; + + private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random"; + + @Setting(value = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY) + private static final String DPF_SELECTOR_STRATEGY = "edc.dataplane.client.selector.strategy"; + + @Inject + private DataFlowManager dataFlowManager; + + @Inject(required = false) + private ControlApiUrl callbackUrl; + + @Inject + private DataPlaneSelectorService selectorService; + + @Inject + private DataPlaneClientFactory clientFactory; + + @Inject(required = false) + private DataFlowPropertiesProvider propertiesProvider; + + @Inject + private FlowTypeExtractor flowTypeExtractor; + + @Override + public void initialize(ServiceExtensionContext context) { + var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY); + var controller = new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), + clientFactory, selectionStrategy, flowTypeExtractor); + dataFlowManager.register(controller); + } + + private DataFlowPropertiesProvider getPropertiesProvider() { + return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider; + } +} diff --git a/edc-extensions/transfer-dataplane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/transfer-dataplane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 000000000..5ae1d5a7f --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,20 @@ +################################################################################# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension diff --git a/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtensionTest.java b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtensionTest.java new file mode 100644 index 000000000..a2a9c5ba2 --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/TransferDataPlaneSignalingExtensionTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.transfer.signaling; + +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(DependencyInjectionExtension.class) +class TransferDataPlaneSignalingExtensionTest { + + private final DataFlowManager dataFlowManager = mock(); + + @BeforeEach + void setup(ServiceExtensionContext context) { + context.registerService(DataFlowManager.class, dataFlowManager); + } + + @Test + void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) { + extension.initialize(context); + verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class)); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7888172f9..ea6d5696b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -50,6 +50,7 @@ include(":edc-extensions:cx-policy") include(":edc-extensions:iatp:tx-iatp") include(":edc-extensions:iatp:tx-iatp-sts-dim") include(":edc-extensions:data-flow-properties-provider") +include(":edc-extensions:transfer-dataplane-signaling") // extensions - data plane include(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api") From 1518e924af67fc8bc98ab87169b4168c88aa8629 Mon Sep 17 00:00:00 2001 From: bmg13 Date: Thu, 5 Sep 2024 12:07:03 +0100 Subject: [PATCH 2/5] Update dependencies. --- edc-extensions/transfer-dataplane-signaling/build.gradle.kts | 3 --- 1 file changed, 3 deletions(-) diff --git a/edc-extensions/transfer-dataplane-signaling/build.gradle.kts b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts index 1bf289e1c..8c83e687e 100644 --- a/edc-extensions/transfer-dataplane-signaling/build.gradle.kts +++ b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts @@ -18,12 +18,9 @@ plugins { } dependencies { - implementation(libs.edc.spi.dataplane.dataplane) implementation(libs.edc.spi.web) implementation(libs.edc.spi.dataplane.selector) - implementation(libs.edc.spi.core) implementation(libs.edc.spi.transfer) - implementation(libs.edc.spi.dataplane.transfer) testImplementation(libs.edc.junit) } From 09ae6604a11dcb12b8b2b08af25690b374752104 Mon Sep 17 00:00:00 2001 From: bmg13 Date: Thu, 5 Sep 2024 13:09:50 +0100 Subject: [PATCH 3/5] Fix transfer signaling client. --- edc-extensions/transfer-dataplane-signaling/build.gradle.kts | 1 + gradle/libs.versions.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/edc-extensions/transfer-dataplane-signaling/build.gradle.kts b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts index 8c83e687e..09f245e65 100644 --- a/edc-extensions/transfer-dataplane-signaling/build.gradle.kts +++ b/edc-extensions/transfer-dataplane-signaling/build.gradle.kts @@ -21,6 +21,7 @@ dependencies { implementation(libs.edc.spi.web) implementation(libs.edc.spi.dataplane.selector) implementation(libs.edc.spi.transfer) + implementation(libs.edc.dpf.signaling.client) testImplementation(libs.edc.junit) } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f2153886a..964a89cf4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -184,6 +184,7 @@ edc-dpf-api-control = { module = "org.eclipse.edc:data-plane-control-api", versi edc-dpf-api-public-v2 = { module = "org.eclipse.edc:data-plane-public-api-v2", version.ref = "edc" } edc-dpf-api-signaling = { module = "org.eclipse.edc:data-plane-signaling-api", version.ref = "edc" } +edc-dpf-signaling-client = { module = "org.eclipse.edc:data-plane-signaling-client", version.ref = "edc" } edc-data-plane-selector-control-api = { module = "org.eclipse.edc:data-plane-selector-control-api", version.ref = "edc" } edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" } From c86698ae5769b5fe4c00a908df3a248cb0375ab8 Mon Sep 17 00:00:00 2001 From: bmg13 Date: Tue, 10 Sep 2024 09:09:10 +0100 Subject: [PATCH 4/5] Add DataPlaneSignalingFlowControllerTest class. --- .../DataPlaneSignalingFlowControllerTest.java | 387 ++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java diff --git a/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java new file mode 100644 index 000000000..d18efb055 --- /dev/null +++ b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java @@ -0,0 +1,387 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.controlplane.transfer.dataplane.flow; + +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.ResponseStatus; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class DataPlaneSignalingFlowControllerTest { + + private static final String HTTP_DATA_PULL = "HttpData-PULL"; + private final DataPlaneClient dataPlaneClient = mock(); + private final DataPlaneClientFactory dataPlaneClientFactory = mock(); + private final DataPlaneSelectorService selectorService = mock(); + private final DataFlowPropertiesProvider propertiesProvider = mock(); + private final TransferTypeParser transferTypeParser = mock(); + + private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( + () -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, + "random", transferTypeParser); + + @Nested + class CanHandle { + @Test + void shouldReturnTrue_whenFlowTypeIsValid() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PUSH))); + var transferProcess = transferProcess("Custom", "Valid-PUSH"); + + var result = flowController.canHandle(transferProcess); + + assertThat(result).isTrue(); + } + + @Test + void shouldReturnFalse_whenFlowTypeIsNotValid() { + when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); + var transferProcess = transferProcess("Custom", "Invalid-ANY"); + + var result = flowController.canHandle(transferProcess); + + assertThat(result).isFalse(); + } + } + + @Nested + class InitiateFlow { + @Test + void transferSuccess() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + var source = testDataAddress(); + var policy = Policy.Builder.newInstance().assignee("participantId").build(); + var transferProcess = transferProcessBuilder() + .transferType("transferType") + .contentDataAddress(testDataAddress()) + .build(); + + var customProperties = Map.of("foo", "bar"); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(customProperties)); + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), anyString(), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, policy); + + assertThat(result).isSucceeded().extracting(DataFlowResponse::getDataPlaneId).isEqualTo(dataPlaneInstance.getId()); + var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class); + verify(dataPlaneClient).start(captor.capture()); + var captured = captor.getValue(); + assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); + assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination()); + assertThat(captured.getParticipantId()).isEqualTo(policy.getAssignee()); + assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId()); + assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId()); + assertThat(captured.getFlowType()).isEqualTo(FlowType.PULL); + assertThat(captured.getProperties()).containsAllEntriesOf(customProperties); + assertThat(captured.getCallbackAddress()).isNotNull(); + } + + @Test + void transferSuccess_withReturnedDataAddress() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + var policy = Policy.Builder.newInstance().assignee("participantId").build(); + var transferProcess = transferProcessBuilder() + .transferType(HTTP_DATA_PULL) + .contentDataAddress(testDataAddress()) + .build(); + + var response = mock(DataFlowResponseMessage.class); + when(response.getDataAddress()).thenReturn(DataAddress.Builder.newInstance().type("type").build()); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(response)); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), eq(HTTP_DATA_PULL), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, policy); + + assertThat(result).isSucceeded() + .satisfies(dataFlowResponse -> { + assertThat(dataFlowResponse.getDataPlaneId()).isEqualTo(dataPlaneInstance.getId()); + assertThat(dataFlowResponse.getDataAddress()).isNotNull(); + }); + } + + @Test + void shouldFail_whenNoDataplaneSelected() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); + when(selectorService.select(any(), anyString(), any())).thenReturn(ServiceResult.notFound("no dataplane found")); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result).isFailed(); + } + + @Test + void invalidTransferType() { + when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType("invalid") + .build(); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result).isFailed().detail().contains("cannot parse"); + } + + @Test + void returnFailedResult_whenPropertiesResolveFails() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + var errorMsg = "error"; + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + + @Test + void returnFailedResultIfTransferFails() { + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + var errorMsg = "error"; + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); + when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), anyString(), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + verify(dataPlaneClient).start(any()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + } + + @Nested + class Terminate { + + @Test + void shouldCallTerminateOnTheRightDataPlane() { + var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build(); + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("dataPlaneId") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.findById(any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + verify(dataPlaneClientFactory).createClient(dataPlaneInstance); + } + + @Test + void shouldFail_whenDataPlaneNotFound() { + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found")); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isFailed().detail().contains("Failed to select the data plane for terminating the transfer process"); + } + + @Test // a null dataPlaneId means that the flow has not been started so it can be considered as already terminated + void shouldReturnSuccess_whenDataPlaneIdIsNull() { + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId(null) + .build(); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory, selectorService); + } + } + + @Nested + class Suspend { + + @Test + void shouldCallTerminate() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("dataPlaneId") + .build(); + when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); + var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build(); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.findById(any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).suspend("transferProcessId"); + verify(dataPlaneClientFactory).createClient(dataPlaneInstance); + } + + @Test + void shouldFail_whenDataPlaneDoesNotExist() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found")); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process"); + verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory); + } + + @Test + void shouldFail_whenDataPlaneIdIsNull() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId(null) + .build(); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process"); + verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory, selectorService); + } + + } + + @Nested + class TransferTypes { + + @Test + void transferTypes_shouldReturnTypesForSpecifiedAsset() { + when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of( + dataPlaneInstanceBuilder().allowedTransferType("Custom-PUSH").allowedSourceType("TargetSrc").allowedDestType("TargetDest").build(), + dataPlaneInstanceBuilder().allowedTransferType("Custom-PULL").allowedSourceType("TargetSrc").allowedDestType("AnotherTargetDest").build(), + dataPlaneInstanceBuilder().allowedSourceType("AnotherSrc").allowedDestType("ThisWontBeListed").build() + ))); + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).containsExactly("Custom-PUSH", "Custom-PULL"); + } + + @Test + void shouldReturnEmptyList_whenCannotGetDataplaneInstances() { + when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error")); + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).isEmpty(); + } + } + + @NotNull + private DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); + } + + private DataPlaneInstance createDataPlaneInstance() { + return dataPlaneInstanceBuilder().build(); + } + + private DataAddress testDataAddress() { + return DataAddress.Builder.newInstance().type("test-type").build(); + } + + private TransferProcess transferProcess(String destinationType, String transferType) { + return TransferProcess.Builder.newInstance() + .transferType(transferType) + .dataDestination(DataAddress.Builder.newInstance().type(destinationType).build()) + .build(); + } + + private TransferProcess.Builder transferProcessBuilder() { + return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) + .protocol("test-protocol") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .counterPartyAddress("test.connector.address") + .dataDestination(DataAddress.Builder.newInstance().type("test").build()); + } + +} From 60ebfe166284d4f05ef939b09c2f1f4ff3861c40 Mon Sep 17 00:00:00 2001 From: bmg13 Date: Tue, 10 Sep 2024 09:35:23 +0100 Subject: [PATCH 5/5] Add DataPlaneSignalingFlowControllerTest class. --- .../DataPlaneSignalingFlowControllerTest.java | 91 +++++++++---------- 1 file changed, 42 insertions(+), 49 deletions(-) diff --git a/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java index d18efb055..8ebf8678e 100644 --- a/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java +++ b/edc-extensions/transfer-dataplane-signaling/src/test/java/org/eclipse/tractusx/edc/dataplane/transfer/signaling/flow/DataPlaneSignalingFlowControllerTest.java @@ -1,22 +1,27 @@ /* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. * - * SPDX-License-Identifier: Apache-2.0 + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. * + * SPDX-License-Identifier: Apache-2.0 */ -package org.eclipse.edc.connector.controlplane.transfer.dataplane.flow; +package org.eclipse.tractusx.edc.dataplane.transfer.signaling.flow; import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; @@ -26,13 +31,12 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; -import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; -import org.eclipse.edc.spi.types.domain.transfer.TransferType; +import org.eclipse.tractusx.edc.dataplane.transfer.signaling.DataPlaneSignalingFlowController; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -60,17 +64,17 @@ public class DataPlaneSignalingFlowControllerTest { private final DataPlaneClientFactory dataPlaneClientFactory = mock(); private final DataPlaneSelectorService selectorService = mock(); private final DataFlowPropertiesProvider propertiesProvider = mock(); - private final TransferTypeParser transferTypeParser = mock(); + private final FlowTypeExtractor flowTypeExtractor = mock(); private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( () -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, - "random", transferTypeParser); + "random", flowTypeExtractor); @Nested class CanHandle { @Test void shouldReturnTrue_whenFlowTypeIsValid() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PUSH))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); var transferProcess = transferProcess("Custom", "Valid-PUSH"); var result = flowController.canHandle(transferProcess); @@ -80,7 +84,7 @@ void shouldReturnTrue_whenFlowTypeIsValid() { @Test void shouldReturnFalse_whenFlowTypeIsNotValid() { - when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR)); var transferProcess = transferProcess("Custom", "Invalid-ANY"); var result = flowController.canHandle(transferProcess); @@ -93,7 +97,7 @@ void shouldReturnFalse_whenFlowTypeIsNotValid() { class InitiateFlow { @Test void transferSuccess() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); var source = testDataAddress(); var policy = Policy.Builder.newInstance().assignee("participantId").build(); var transferProcess = transferProcessBuilder() @@ -127,7 +131,7 @@ void transferSuccess() { @Test void transferSuccess_withReturnedDataAddress() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); var policy = Policy.Builder.newInstance().assignee("participantId").build(); var transferProcess = transferProcessBuilder() .transferType(HTTP_DATA_PULL) @@ -153,7 +157,7 @@ void transferSuccess_withReturnedDataAddress() { @Test void shouldFail_whenNoDataplaneSelected() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType(HTTP_DATA_PULL) @@ -167,9 +171,23 @@ void shouldFail_whenNoDataplaneSelected() { assertThat(result).isFailed(); } + @Test // a null dataPlaneId means that the flow has not been started so it can be considered as already terminated + void shouldReturnSuccess_whenDataPlaneIdIsNull() { + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId(null) + .build(); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory, selectorService); + } + @Test void invalidTransferType() { - when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, "error")); var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType("invalid") @@ -177,12 +195,12 @@ void invalidTransferType() { var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); - assertThat(result).isFailed().detail().contains("cannot parse"); + assertThat(result).isFailed().messages().containsOnly("error"); } @Test void returnFailedResult_whenPropertiesResolveFails() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); var errorMsg = "error"; var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) @@ -198,7 +216,7 @@ void returnFailedResult_whenPropertiesResolveFails() { @Test void returnFailedResultIfTransferFails() { - when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); + when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); var errorMsg = "error"; var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) @@ -257,20 +275,6 @@ void shouldFail_whenDataPlaneNotFound() { assertThat(result).isFailed().detail().contains("Failed to select the data plane for terminating the transfer process"); } - - @Test // a null dataPlaneId means that the flow has not been started so it can be considered as already terminated - void shouldReturnSuccess_whenDataPlaneIdIsNull() { - var transferProcess = transferProcessBuilder() - .id("transferProcessId") - .contentDataAddress(testDataAddress()) - .dataPlaneId(null) - .build(); - - var result = flowController.terminate(transferProcess); - - assertThat(result).isSucceeded(); - verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory, selectorService); - } } @Nested @@ -302,6 +306,8 @@ void shouldFail_whenDataPlaneDoesNotExist() { .contentDataAddress(testDataAddress()) .dataPlaneId("invalid") .build(); + when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found")); var result = flowController.suspend(transferProcess); @@ -310,19 +316,6 @@ void shouldFail_whenDataPlaneDoesNotExist() { verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory); } - @Test - void shouldFail_whenDataPlaneIdIsNull() { - var transferProcess = TransferProcess.Builder.newInstance() - .id("transferProcessId") - .contentDataAddress(testDataAddress()) - .dataPlaneId(null) - .build(); - - var result = flowController.suspend(transferProcess); - - assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process"); - verifyNoInteractions(dataPlaneClient, dataPlaneClientFactory, selectorService); - } }