diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java index c86ef5814..95829d873 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java @@ -21,6 +21,9 @@ import java.net.URLClassLoader; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import okhttp3.OkHttpClient; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; @@ -68,6 +71,10 @@ public static void main(StreamExecutionEnvironment env, StatefulFunctionsConfig env.getConfig().enableObjectReuse(); + // Enable fine-grained logging for OkHttp to get details about unclosed connections when they + // occur + Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE); + final StatefulFunctionsUniverse statefulFunctionsUniverse = StatefulFunctionsUniverses.get( Thread.currentThread().getContextClassLoader(), stateFunConfig); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java index f2cf7951a..4457b54b1 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java @@ -71,11 +71,15 @@ public CompletableFuture call( } private static FromFunction parseResponse(Response response) { - final InputStream httpResponseBody = responseBody(response); try { - return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); + final InputStream httpResponseBody = responseBody(response); + try { + return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); + } finally { + IOUtils.closeQuietly(httpResponseBody); + } } finally { - IOUtils.closeQuietly(httpResponseBody); + response.close(); } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java index ffb7f7bb7..004a014c9 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -30,6 +30,7 @@ import okhttp3.Call; import okhttp3.Callback; import okhttp3.Response; +import okhttp3.ResponseBody; import okio.Timeout; import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; @@ -90,6 +91,7 @@ private void onFailureUnsafe(Call call, IOException cause) { if (isShutdown.getAsBoolean()) { throw new IllegalStateException("An exception caught during shutdown.", cause); } + LOG.warn( "Retriable exception caught while trying to deliver a message: " + requestSummary, cause); metrics.remoteInvocationFailures(); @@ -105,9 +107,48 @@ private void onResponseUnsafe(Call call, Response response) { resultFuture.complete(response); return; } - if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) { + + boolean isRetryable = RETRYABLE_HTTP_CODES.contains(response.code()); + + String prefixString = + isRetryable + ? "Non-successful, retryable HTTP response code " + response.code() + " received" + : "Non-successful, non-retryable HTTP response code " + response.code() + " received"; + + try { + ResponseBody body = response.body(); + if (body == null) { + String errorMessage = prefixString + " and the response body is null."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } else { + String bodyText = body.string(); + String errorMessage = prefixString + " with body \"" + bodyText + "\"."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } + } catch (IOException exception) { + String errorMessage = + prefixString + " and encountered an IOException while reading the body."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } + + response.close(); + + if (!isRetryable && response.code() < 500) { throw new IllegalStateException("Non successful HTTP response code " + response.code()); } + if (!retryAfterApplyingBackoff(call)) { throw new IllegalStateException( "Maximal request time has elapsed. Last known error is: invalid HTTP response code "