Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Apache HttpClient5 default configuration allocates 100mb byte buffer per request #619

Closed
johhud1 opened this issue Sep 12, 2023 · 2 comments · Fixed by #620
Closed
Assignees
Labels
backport 2.x Backport to 2.x branch bug Something isn't working

Comments

@johhud1
Copy link

johhud1 commented Sep 12, 2023

What is the bug?

Apache HttpClient5 Default configuration ( using HttpAsyncResponseConsumerFactory with implementation HeapBufferedResponseConsumerFactory) allocates a 100mb byte buffer for every opensearch request. This results in very high cpu usage and poor performance. The problem seems to be due to implementation of HttpAsyncResponseConsumerFactory and HeapBufferedAsyncEntityConsumer.java#L91 .

How can one reproduce the bug?

Construct an Apache HttpClient5 based OpenSearchTranpsort and client object like so

        final OpenSearchTransport transport =
            ApacheHttpClient5TransportBuilder
                .builder(HttpHost.create(hostName))
                .setMapper(new JacksonJsonpMapper())
                .build();
        OpenSearchClient client = new OpenSearchClient(transport);

and execute client.search(..) method.

What is the expected behavior?

Application should not allocate 100mb byte buffer for every request.

Here's an example implementation that does not suffer from this problem and in testing reduced cpu usage by 10x.

import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.search.*;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.*;

   class MyHeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory {

        /**
         * Creates a {@link HeapBufferedResponseConsumerFactory} instance with the given buffer limit.
         *
         * @param bufferLimitBytes the buffer limit to be applied to this instance
         */
        public MyHeapBufferedResponseConsumerFactory() {
        }

        /**
         * Creates the {@link AsyncResponseConsumer}, called once per request attempt.
         */
        @Override
        public AsyncResponseConsumer<ClassicHttpResponse> createHttpAsyncResponseConsumer() {
            return new MyHeapBufferedAsyncResponseConsumer();
        }
    }

    class MyHeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer<ClassicHttpResponse, byte[]> {

        /**
         * Creates a new instance of this consumer with the provided buffer limit.
         *
         * @param bufferLimit the buffer limit. Must be greater than 0.
         * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0.
         */
        public MyHeapBufferedAsyncResponseConsumer() {
            super(new BasicAsyncEntityConsumer());
        }

        /**
         * Triggered to signal receipt of an intermediate (1xx) HTTP response.
         *
         * @param response the intermediate (1xx) HTTP response.
         * @param context the actual execution context.
         */
        @Override
        public void informationResponse(final HttpResponse response, final HttpContext context) throws HttpException, IOException {}

        /**
         * Triggered to generate object that represents a result of response message processing.
         * @param response the response message.
         * @param entity the response entity.
         * @param contentType the response content type.
         * @return the result of response processing.
         */
        @Override
        protected ClassicHttpResponse buildResult(final HttpResponse response, final byte[] entity, final ContentType contentType) {
            final ClassicHttpResponse classicResponse = new BasicClassicHttpResponse(response.getCode());
            classicResponse.setVersion(response.getVersion());
            classicResponse.setHeaders(response.getHeaders());
            classicResponse.setReasonPhrase(response.getReasonPhrase());
            if (response.getLocale() != null) {
                classicResponse.setLocale(response.getLocale());
            }

            if (entity != null) {
                String encoding = null;

                try {
                    final Header contentEncoding = response.getHeader(HttpHeaders.CONTENT_ENCODING);
                    if (contentEncoding != null) {
                        encoding = contentEncoding.getValue();
                    }
                } catch (final HttpException ex) {
                }

                final ByteArrayEntity httpEntity = new ByteArrayEntity(entity, contentType, encoding);
                classicResponse.setEntity(httpEntity);
            }

            return classicResponse;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ApacheHttpClient5Options.Builder optionsBuilder = ApacheHttpClient5Options.DEFAULT.toBuilder();
        optionsBuilder.setHttpAsyncResponseConsumerFactory(new MyHeapBufferedResponseConsumerFactory());


        final OpenSearchTransport transport =
            ApacheHttpClient5TransportBuilder
                .builder(HttpHost.create(hostName))
                .setMapper(new JacksonJsonpMapper())
                .setOptions(optionsBuilder.build())
                .build();
        }
}

What is your host/environment?

OS: Fedora release 38 (Thirty Eight) x86_64

Do you have any screenshots?

No

Do you have any additional context?

Apologies if there are compiler errors in the above example implementation; was cobbled together from an internal project but I hope this is enough to make the issue clear. Please let me know if there's more info I can provide.

@johhud1 johhud1 added bug Something isn't working untriaged labels Sep 12, 2023
@reta
Copy link
Collaborator

reta commented Sep 12, 2023

@johhud1 thank you, it has been identified and fix is on the way (see please opensearch-project/OpenSearch#9993), as a workaround, you could provide own HttpAsyncResponseConsumerFactory with smaller limits.

@reta reta self-assigned this Sep 12, 2023
@reta reta removed the untriaged label Sep 12, 2023
@reta
Copy link
Collaborator

reta commented Sep 12, 2023

Related to opensearch-project/OpenSearch#9866

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x Backport to 2.x branch bug Something isn't working
Projects
None yet
2 participants