From 8556d6329e4563a323faf679c5cb53aaa90910c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9rique=20Mittelstaedt?= Date: Thu, 31 Aug 2023 12:50:20 +0100 Subject: [PATCH 1/2] Fix unclosed requests --- .../flink/core/StatefulFunctionsJob.java | 7 ++++ .../httpfn/DefaultHttpRequestReplyClient.java | 10 +++-- .../flink/core/httpfn/RetryingCallback.java | 42 ++++++++++++++++++- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java index c86ef5814..95829d873 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java @@ -21,6 +21,9 @@ import java.net.URLClassLoader; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import okhttp3.OkHttpClient; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; @@ -68,6 +71,10 @@ public static void main(StreamExecutionEnvironment env, StatefulFunctionsConfig env.getConfig().enableObjectReuse(); + // Enable fine-grained logging for OkHttp to get details about unclosed connections when they + // occur + Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE); + final StatefulFunctionsUniverse statefulFunctionsUniverse = StatefulFunctionsUniverses.get( Thread.currentThread().getContextClassLoader(), stateFunConfig); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java index f2cf7951a..4457b54b1 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java @@ -71,11 +71,15 @@ public CompletableFuture call( } private static FromFunction parseResponse(Response response) { - final InputStream httpResponseBody = responseBody(response); try { - return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); + final InputStream httpResponseBody = responseBody(response); + try { + return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); + } finally { + IOUtils.closeQuietly(httpResponseBody); + } } finally { - IOUtils.closeQuietly(httpResponseBody); + response.close(); } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java index ffb7f7bb7..87c4705bc 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -90,6 +90,7 @@ private void onFailureUnsafe(Call call, IOException cause) { if (isShutdown.getAsBoolean()) { throw new IllegalStateException("An exception caught during shutdown.", cause); } + LOG.warn( "Retriable exception caught while trying to deliver a message: " + requestSummary, cause); metrics.remoteInvocationFailures(); @@ -105,9 +106,48 @@ private void onResponseUnsafe(Call call, Response response) { resultFuture.complete(response); return; } - if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) { + + boolean isRetryable = RETRYABLE_HTTP_CODES.contains(response.code()); + + String prefixString = + isRetryable + ? "Non-successful, retryable HTTP response code " + response.code() + " received" + : "Non-successful, non-retryable HTTP response code " + response.code() + " received"; + + try { + ResponseBody body = response.body(); + if (body == null) { + String errorMessage = prefixString + " and the response body is null."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } else { + String bodyText = body.string(); + String errorMessage = prefixString + " with body \"" + bodyText + "\"."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } + } catch (IOException exception) { + String errorMessage = + prefixString + " and encountered an IOException while reading the body."; + if (isRetryable) { + LOG.warn(errorMessage); + } else { + LOG.error(errorMessage); + } + } + + response.close(); + + if (!isRetryable && response.code() < 500) { throw new IllegalStateException("Non successful HTTP response code " + response.code()); } + if (!retryAfterApplyingBackoff(call)) { throw new IllegalStateException( "Maximal request time has elapsed. Last known error is: invalid HTTP response code " From dd9591a4aca60699d067f1fa20cecd6bcdb0c45a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9rique=20Mittelstaedt?= Date: Mon, 4 Sep 2023 10:23:25 +0100 Subject: [PATCH 2/2] add missing import --- .../flink/statefun/flink/core/httpfn/RetryingCallback.java | 1 + 1 file changed, 1 insertion(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java index 87c4705bc..004a014c9 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -30,6 +30,7 @@ import okhttp3.Call; import okhttp3.Callback; import okhttp3.Response; +import okhttp3.ResponseBody; import okio.Timeout; import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;