Skip to content

Commit

Permalink
fix: handle fatal errors in dsp http dispatcher delegate (#3203)
Browse files Browse the repository at this point in the history
feat: handle fatal errors in dsp http dispatcher delegate
  • Loading branch information
ndr-brt committed Jun 21, 2023
1 parent 2e3ee8c commit 176aedd
Show file tree
Hide file tree
Showing 41 changed files with 780 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.message.RemoteMessageDispatcher;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -38,22 +38,14 @@ public void register(RemoteMessageDispatcher dispatcher) {
}

@Override
public <T> CompletableFuture<T> send(Class<T> responseType, RemoteMessage message) {
public <T> CompletableFuture<StatusResult<T>> dispatch(Class<T> responseType, RemoteMessage message) {
Objects.requireNonNull(message, "Message was null");
var protocol = message.getProtocol();
var dispatcher = getDispatcher(protocol);
var dispatcher = dispatchers.get(protocol);
if (dispatcher == null) {
return failedFuture(new EdcException("No provider dispatcher registered for protocol: " + protocol));
}
return dispatcher.send(responseType, message);
return dispatcher.dispatch(responseType, message);
}

@Nullable
private RemoteMessageDispatcher getDispatcher(@Nullable String protocol) {
if (protocol == null) {
return dispatchers.values().stream().findFirst()
.orElse(null);
}
return dispatchers.get(protocol);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort;
import static org.eclipse.edc.junit.testfixtures.TestUtils.testOkHttpClient;
import static org.eclipse.edc.spi.http.FallbackFactories.statusMustBe;
import static org.eclipse.edc.spi.http.FallbackFactories.statusMustBeSuccessful;
import static org.eclipse.edc.spi.http.FallbackFactories.retryWhenStatusIsNot;
import static org.eclipse.edc.spi.http.FallbackFactories.retryWhenStatusNot2xxOr4xx;
import static org.mockito.Mockito.mock;
import static org.mockserver.matchers.Times.once;
import static org.mockserver.matchers.Times.unlimited;
Expand Down Expand Up @@ -110,7 +110,7 @@ void execute_fallback_shouldFailAfterAttemptsExpired_whenResponseFails() {
}

@Test
void execute_fallback_shouldRetryIfStatusIsNotSuccessful() {
void execute_fallback_shouldRetryIfStatusIsNot2xxOr4xx() {
var client = clientWith(RetryPolicy.<Response>builder().withMaxAttempts(2).build());

var request = new Request.Builder()
Expand All @@ -119,7 +119,7 @@ void execute_fallback_shouldRetryIfStatusIsNotSuccessful() {

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(500));

var result = client.execute(request, List.of(statusMustBeSuccessful()), handleResponse());
var result = client.execute(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());

assertThat(result).matches(Result::failed).extracting(Result::getFailureMessages).asList()
.first().asString().matches(it -> it.startsWith("Server response to"));
Expand All @@ -135,7 +135,7 @@ void execute_fallback_shouldRetryIfStatusIsNotAsExpected() {
.build();
server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(200));

var result = client.execute(request, List.of(statusMustBe(204)), handleResponse());
var result = client.execute(request, List.of(retryWhenStatusIsNot(204)), handleResponse());

assertThat(result).matches(Result::failed).extracting(Result::getFailureMessages).asList()
.first().asString().matches(it -> it.startsWith("Server response to"));
Expand Down Expand Up @@ -167,12 +167,44 @@ void executeAsync_fallback_shouldRetryIfStatusIsNotSuccessful() {

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(500));

var result = client.executeAsync(request, List.of(statusMustBeSuccessful()), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
}

@Test
void executeAsync_fallback_shouldRetryIfStatusIs4xx() {
var client = clientWith(RetryPolicy.<Response>builder().withMaxAttempts(2).build());

var request = new Request.Builder()
.url("http://localhost:" + port)
.build();

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(500));

var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
}

@Test
void executeAsync_fallback_shouldNotRetryIfStatusIsExpected() {
var client = clientWith(RetryPolicy.<Response>builder().withMaxAttempts(2).build());

var request = new Request.Builder()
.url("http://localhost:" + port)
.build();

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(404));

var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());

assertThat(result).succeedsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(1));
}

@Test
void executeAsync_fallback_shouldRetryIfStatusIsNotAsExpected() {
var client = clientWith(RetryPolicy.<Response>builder().withMaxAttempts(2).build());
Expand All @@ -182,7 +214,7 @@ void executeAsync_fallback_shouldRetryIfStatusIsNotAsExpected() {
.build();
server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(200));

var result = client.executeAsync(request, List.of(statusMustBe(204)), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusIsNot(204)), handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
Expand Down Expand Up @@ -211,7 +243,11 @@ private static EdcHttpClientImpl clientWith(RetryPolicy<Response> retryPolicy) {
private Function<Response, Result<String>> handleResponse() {
return r -> {
try {
return Result.success(typeManager.readValue(r.body().string(), Map.class).get("message").toString());
if (r.isSuccessful()) {
return Result.success(typeManager.readValue(r.body().string(), Map.class).get("message").toString());
} else {
return Result.success(r.message());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.statemachine.retry;

import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.ResponseFailure;
import org.eclipse.edc.spi.response.StatusResult;

import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
* Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} with a {@link StatusResult} content
*/
public class AsyncStatusResultRetryProcess<E extends StatefulEntity<E>, C, SELF extends AsyncStatusResultRetryProcess<E, C, SELF>>
extends CompletableFutureRetryProcess<E, StatusResult<C>, SELF> {
private final Monitor monitor;
private BiConsumer<E, ResponseFailure> onFatalError;

public AsyncStatusResultRetryProcess(E entity, Supplier<CompletableFuture<StatusResult<C>>> process, Monitor monitor, Clock clock, EntityRetryProcessConfiguration configuration) {
super(entity, process, monitor, clock, configuration);
this.monitor = monitor;
}

@Override
public SELF onSuccess(BiConsumer<E, StatusResult<C>> onSuccessHandler) {
this.onSuccessHandler = (entity, result) -> {
new StatusResultRetryProcess<>(entity, () -> result, monitor, clock, configuration)
.onSuccess((e, c) -> onSuccessHandler.accept(e, StatusResult.success(c)))
.onFatalError(onFatalError)
.onRetryExhausted((e, failure) -> onRetryExhausted.accept(e, new EdcException(failure.getFailureDetail())))
.onFailure((e, failure) -> onFailureHandler.accept(e, new EdcException(failure.getFailureDetail())))
.process(entity, description);
};
return (SELF) this;
}

public SELF onFatalError(BiConsumer<E, ResponseFailure> onFatalError) {
this.onFatalError = onFatalError;
return (SELF) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
/**
* Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} object
*/
public class CompletableFutureRetryProcess<E extends StatefulEntity<E>, C> extends RetryProcess<E, CompletableFutureRetryProcess<E, C>> {
public class CompletableFutureRetryProcess<E extends StatefulEntity<E>, C, SELF extends CompletableFutureRetryProcess<E, C, SELF>>
extends RetryProcess<E, CompletableFutureRetryProcess<E, C, SELF>> {
private final Supplier<CompletableFuture<C>> process;
private final Monitor monitor;
private Function<String, E> entityRetrieve;
private BiConsumer<E, C> onSuccessHandler;
private BiConsumer<E, Throwable> onFailureHandler;
private BiConsumer<E, Throwable> onRetryExhausted;
protected BiConsumer<E, C> onSuccessHandler;
protected BiConsumer<E, Throwable> onFailureHandler;
protected BiConsumer<E, Throwable> onRetryExhausted;

public CompletableFutureRetryProcess(E entity, Supplier<CompletableFuture<C>> process, Monitor monitor, Clock clock, EntityRetryProcessConfiguration configuration) {
super(entity, configuration, monitor, clock);
Expand Down Expand Up @@ -79,23 +80,23 @@ boolean process(E entity, String description) {
return true;
}

public CompletableFutureRetryProcess<E, C> onSuccess(BiConsumer<E, C> onSuccessHandler) {
public SELF onSuccess(BiConsumer<E, C> onSuccessHandler) {
this.onSuccessHandler = onSuccessHandler;
return this;
return (SELF) this;
}

public CompletableFutureRetryProcess<E, C> onFailure(BiConsumer<E, Throwable> onFailureHandler) {
public SELF onFailure(BiConsumer<E, Throwable> onFailureHandler) {
this.onFailureHandler = onFailureHandler;
return this;
return (SELF) this;
}

public CompletableFutureRetryProcess<E, C> entityRetrieve(Function<String, E> entityRetrieve) {
public SELF entityRetrieve(Function<String, E> entityRetrieve) {
this.entityRetrieve = entityRetrieve;
return this;
return (SELF) this;
}

public CompletableFutureRetryProcess<E, C> onRetryExhausted(BiConsumer<E, Throwable> onRetryExhausted) {
public SELF onRetryExhausted(BiConsumer<E, Throwable> onRetryExhausted) {
this.onRetryExhausted = onRetryExhausted;
return this;
return (SELF) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ public <T extends StatefulEntity<T>, C> StatusResultRetryProcess<T, C> doSyncPro
/**
* Initialize an asynchronous process that needs to be retried if it does not succeed
*/
public <T extends StatefulEntity<T>, C> CompletableFutureRetryProcess<T, C> doAsyncProcess(T entity, Supplier<CompletableFuture<C>> process) {
return new CompletableFutureRetryProcess<>(entity, process, monitor, clock, configuration);
public <T extends StatefulEntity<T>, C, SELF extends CompletableFutureRetryProcess<T, C, SELF>> SELF doAsyncProcess(T entity, Supplier<CompletableFuture<C>> process) {
return (SELF) new CompletableFutureRetryProcess<T, C, SELF>(entity, process, monitor, clock, configuration);
}

/**
* Initialize an asynchronous process that will return a {@link StatusResult} and it will need to be handled
*/
public <T extends StatefulEntity<T>, C, SELF extends AsyncStatusResultRetryProcess<T, C, SELF>> SELF doAsyncStatusResultProcess(T entity, Supplier<CompletableFuture<StatusResult<C>>> process) {
return (SELF) new AsyncStatusResultRetryProcess<T, C, SELF>(entity, process, monitor, clock, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
* This works only used on a state machine, where states are persisted.
* The process is a unit of logic that can be executed on the entity.
*/
public abstract class RetryProcess<E extends StatefulEntity<E>, T extends RetryProcess<E, T>> {
public abstract class RetryProcess<E extends StatefulEntity<E>, SELF extends RetryProcess<E, SELF>> {

private final E entity;
private final EntityRetryProcessConfiguration configuration;
private final Monitor monitor;
private final Clock clock;
protected final EntityRetryProcessConfiguration configuration;
protected final Monitor monitor;
protected final Clock clock;
protected Consumer<E> onDelay;
protected String description;

protected RetryProcess(E entity, EntityRetryProcessConfiguration configuration, Monitor monitor, Clock clock) {
this.entity = entity;
Expand All @@ -53,6 +54,7 @@ protected RetryProcess(E entity, EntityRetryProcessConfiguration configuration,
* @return false if process should not be run yet, the result of the process otherwise.
*/
public boolean execute(String description) {
this.description = description;
if (isRetry(entity)) {
var delay = delayMillis(entity);
if (delay > 0) {
Expand All @@ -72,9 +74,9 @@ public boolean execute(String description) {
/**
* Handler that is called if the entity is not yet ready for processing
*/
public T onDelay(Consumer<E> onDelay) {
public SELF onDelay(Consumer<E> onDelay) {
this.onDelay = onDelay;
return (T) this;
return (SELF) this;
}

/**
Expand All @@ -96,7 +98,7 @@ private long delayMillis(E entity) {
delayStrategy.failures(entity.getStateCount() - 1);

// Get the delay time following the number of failures.
long waitMillis = delayStrategy.retryInMillis();
var waitMillis = delayStrategy.retryInMillis();

return entity.getStateTimestamp() + waitMillis - clock.millis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ boolean process(E entity, String description) {
monitor.debug(format("%s: ID %s. %s", entity.getClass().getSimpleName(), entity.getId(), description));
var result = process.get();

handleResult(entity, description, result);

return true;
}

public void handleResult(E entity, String description, StatusResult<C> result) {
if (result.succeeded()) {
if (onSuccessHandler != null) {
onSuccessHandler.accept(entity, result.getContent());
Expand Down Expand Up @@ -86,8 +92,6 @@ boolean process(E entity, String description) {
}
}
}

return true;
}

public StatusResultRetryProcess<E, C> onSuccess(BiConsumer<E, C> onSuccessHandler) {
Expand Down
Loading

0 comments on commit 176aedd

Please sign in to comment.