Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: dataplaneId null for older TransferProcesses #1534

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edc-controlplane/edc-controlplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ dependencies {
runtimeOnly(project(":edc-extensions:edr:edr-api-v2"))
runtimeOnly(project(":edc-extensions:edr:edr-callback"))
runtimeOnly(project(":edc-extensions:tokenrefresh-handler"))
runtimeOnly(project(":edc-extensions:transfer-dataplane-signaling"))
runtimeOnly(libs.edc.core.edrstore)
runtimeOnly(libs.edc.edr.store.receiver)
runtimeOnly(libs.edc.dpf.transfer.signaling)
runtimeOnly(libs.edc.controlplane.callback.staticendpoint)

// needed for BPN validation
Expand Down
27 changes: 27 additions & 0 deletions edc-extensions/transfer-dataplane-signaling/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
implementation(libs.edc.spi.web)
implementation(libs.edc.spi.dataplane.selector)
implementation(libs.edc.spi.transfer)
implementation(libs.edc.dpf.signaling.client)

testImplementation(libs.edc.junit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Set;
import java.util.UUID;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
* Implementation of {@link DataFlowController} that is compliant with the data plane signaling.
* <p>
* It handles all the transfer process where the transferType met the criteria defined in the format mapping of the
* signaling spec
*
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling.md">Data plane signaling</a>
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling-mapping.md">Data plane signaling transfer type mapping</a>
*/
public class DataPlaneSignalingFlowController implements DataFlowController {

private final ControlApiUrl callbackUrl;
private final DataPlaneSelectorService selectorClient;
private final DataPlaneClientFactory clientFactory;
private final DataFlowPropertiesProvider propertiesProvider;
private final String selectionStrategy;
private final FlowTypeExtractor flowTypeExtractor;

public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient,
DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory,
String selectionStrategy, FlowTypeExtractor flowTypeExtractor) {
this.callbackUrl = callbackUrl;
this.selectorClient = selectorClient;
this.propertiesProvider = propertiesProvider;
this.clientFactory = clientFactory;
this.selectionStrategy = selectionStrategy;
this.flowTypeExtractor = flowTypeExtractor;
}

@Override
public boolean canHandle(TransferProcess transferProcess) {
return flowTypeExtractor.extract(transferProcess.getTransferType()).succeeded();
}

@Override
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
var flowType = flowTypeExtractor.extract(transferProcess.getTransferType());
if (flowType.failed()) {
return StatusResult.failure(FATAL_ERROR, flowType.getFailureDetail());
}

var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy);
if (propertiesResult.failed()) {
return StatusResult.failure(FATAL_ERROR, propertiesResult.getFailureDetail());
}

var selection = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getTransferType(), selectionStrategy);
if (!selection.succeeded()) {
return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail());
}

var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
.id(UUID.randomUUID().toString())
.processId(transferProcess.getId())
.sourceDataAddress(transferProcess.getContentDataAddress())
.destinationDataAddress(transferProcess.getDataDestination())
.participantId(policy.getAssignee())
.agreementId(transferProcess.getContractId())
.assetId(transferProcess.getAssetId())
.flowType(flowType.getContent())
.callbackAddress(callbackUrl != null ? callbackUrl.get() : null)
.properties(propertiesResult.getContent())
.build();

var dataPlaneInstance = selection.getContent();
return clientFactory.createClient(dataPlaneInstance)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build()
);
}

@Override
public StatusResult<Void> suspend(TransferProcess transferProcess) {
return getClientForDataplane(transferProcess.getDataPlaneId())
.map(client -> client.suspend(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for suspending the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
public StatusResult<Void> terminate(TransferProcess transferProcess) {
var dataPlaneId = transferProcess.getDataPlaneId();
if (dataPlaneId == null) {
return StatusResult.success();
}

return getClientForDataplane(dataPlaneId)
.map(client -> client.terminate(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for terminating the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
public Set<String> transferTypesFor(Asset asset) {
var result = selectorClient.getAll();
if (result.failed()) {
return emptySet();
}

return result.getContent().stream()
.filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType()))
.map(DataPlaneInstance::getAllowedTransferTypes)
.flatMap(Collection::stream)
.collect(toSet());
}

private StatusResult<DataPlaneClient> getClientForDataplane(String id) {
return selectorClient.findById(id)
.map(clientFactory::createClient)
.map(StatusResult::success)
.orElse(f -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s. %s".formatted(id, f.getFailureDetail())));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;

import java.util.Map;

import static org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension.NAME;

@Extension(NAME)
public class TransferDataPlaneSignalingExtension implements ServiceExtension {

protected static final String NAME = "Transfer Data Plane Signaling Extension";

private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random";

@Setting(value = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY)
private static final String DPF_SELECTOR_STRATEGY = "edc.dataplane.client.selector.strategy";

@Inject
private DataFlowManager dataFlowManager;

@Inject(required = false)
private ControlApiUrl callbackUrl;

@Inject
private DataPlaneSelectorService selectorService;

@Inject
private DataPlaneClientFactory clientFactory;

@Inject(required = false)
private DataFlowPropertiesProvider propertiesProvider;

@Inject
private FlowTypeExtractor flowTypeExtractor;

@Override
public void initialize(ServiceExtensionContext context) {
var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY);
var controller = new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(),
clientFactory, selectionStrategy, flowTypeExtractor);
dataFlowManager.register(controller);
}

private DataFlowPropertiesProvider getPropertiesProvider() {
return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#################################################################################
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
#################################################################################

org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class TransferDataPlaneSignalingExtensionTest {

private final DataFlowManager dataFlowManager = mock();

@BeforeEach
void setup(ServiceExtensionContext context) {
context.registerService(DataFlowManager.class, dataFlowManager);
}

@Test
void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) {
extension.initialize(context);
verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class));
}
}
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ edc-dpf-api-control = { module = "org.eclipse.edc:data-plane-control-api", versi
edc-dpf-api-public-v2 = { module = "org.eclipse.edc:data-plane-public-api-v2", version.ref = "edc" }

edc-dpf-api-signaling = { module = "org.eclipse.edc:data-plane-signaling-api", version.ref = "edc" }
edc-dpf-signaling-client = { module = "org.eclipse.edc:data-plane-signaling-client", version.ref = "edc" }
edc-data-plane-selector-control-api = { module = "org.eclipse.edc:data-plane-selector-control-api", version.ref = "edc" }
edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }

Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ include(":edc-extensions:cx-policy")
include(":edc-extensions:iatp:tx-iatp")
include(":edc-extensions:iatp:tx-iatp-sts-dim")
include(":edc-extensions:data-flow-properties-provider")
include(":edc-extensions:transfer-dataplane-signaling")

// extensions - data plane
include(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api")
Expand Down
Loading