Skip to content

Fix unclosed requests #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.net.URLClassLoader;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.OkHttpClient;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
Expand Down Expand Up @@ -68,6 +71,10 @@ public static void main(StreamExecutionEnvironment env, StatefulFunctionsConfig

env.getConfig().enableObjectReuse();

// Enable fine-grained logging for OkHttp to get details about unclosed connections when they
// occur
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);

final StatefulFunctionsUniverse statefulFunctionsUniverse =
StatefulFunctionsUniverses.get(
Thread.currentThread().getContextClassLoader(), stateFunConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ public CompletableFuture<FromFunction> call(
}

private static FromFunction parseResponse(Response response) {
final InputStream httpResponseBody = responseBody(response);
try {
return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
final InputStream httpResponseBody = responseBody(response);
try {
return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
} finally {
IOUtils.closeQuietly(httpResponseBody);
}
} finally {
IOUtils.closeQuietly(httpResponseBody);
response.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.Timeout;
import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
Expand Down Expand Up @@ -90,6 +91,7 @@ private void onFailureUnsafe(Call call, IOException cause) {
if (isShutdown.getAsBoolean()) {
throw new IllegalStateException("An exception caught during shutdown.", cause);
}

LOG.warn(
"Retriable exception caught while trying to deliver a message: " + requestSummary, cause);
metrics.remoteInvocationFailures();
Expand All @@ -105,9 +107,48 @@ private void onResponseUnsafe(Call call, Response response) {
resultFuture.complete(response);
return;
}
if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) {

boolean isRetryable = RETRYABLE_HTTP_CODES.contains(response.code());

String prefixString =
isRetryable
? "Non-successful, retryable HTTP response code " + response.code() + " received"
: "Non-successful, non-retryable HTTP response code " + response.code() + " received";

try {
ResponseBody body = response.body();
if (body == null) {
String errorMessage = prefixString + " and the response body is null.";
if (isRetryable) {
LOG.warn(errorMessage);
} else {
LOG.error(errorMessage);
}
} else {
String bodyText = body.string();
String errorMessage = prefixString + " with body \"" + bodyText + "\".";
if (isRetryable) {
LOG.warn(errorMessage);
} else {
LOG.error(errorMessage);
}
}
} catch (IOException exception) {
String errorMessage =
prefixString + " and encountered an IOException while reading the body.";
if (isRetryable) {
LOG.warn(errorMessage);
} else {
LOG.error(errorMessage);
}
}

response.close();

if (!isRetryable && response.code() < 500) {
throw new IllegalStateException("Non successful HTTP response code " + response.code());
}

if (!retryAfterApplyingBackoff(call)) {
throw new IllegalStateException(
"Maximal request time has elapsed. Last known error is: invalid HTTP response code "
Expand Down