From f44d29c9d050ea86dc0e4f5332de2e9d1ff042e2 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Thu, 7 Mar 2024 17:19:56 +0200 Subject: [PATCH] Properly support sending InputStream in REST Client This means not keeping the entire content of the stream in memory --- .../client/reactive/SendInputStreamTest.java | 58 ++ .../SendRequestScopedInputStreamTest.java | 118 +++ .../test/resources/larger-than-chunk-size.txt | 863 ++++++++++++++++++ ...end-request-scoped-input-stream.properties | 1 + .../handlers/ClientSendRequestHandler.java | 24 +- .../client/impl/InputStreamReadStream.java | 179 ++++ .../client/impl/RestClientRequestContext.java | 4 + 7 files changed, 1245 insertions(+), 2 deletions(-) create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendInputStreamTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendRequestScopedInputStreamTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/larger-than-chunk-size.txt create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/send-request-scoped-input-stream.properties create mode 100644 independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InputStreamReadStream.java diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendInputStreamTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendInputStreamTest.java new file mode 100644 index 0000000000000..a89154993b8b8 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendInputStreamTest.java @@ -0,0 +1,58 @@ +package io.quarkus.rest.client.reactive; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.net.URI; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; + +public class SendInputStreamTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest(); + + private final File FILE = new File("./src/test/resources/larger-than-chunk-size.txt"); + + @TestHTTPResource + URI uri; + + @Test + public void test() throws FileNotFoundException { + Client client = RestClientBuilder.newBuilder().baseUri(uri).build(Client.class); + + InputStream is = new FileInputStream(FILE); + long result = client.count(is); + + assertEquals(FILE.length(), result); + } + + @Path("test") + public interface Client { + + @POST + @Path("count") + long count(InputStream is); + } + + @Path("test") + public static class Resource { + + @POST + @Path("count") + public long count(String input) { + return input.length(); + } + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendRequestScopedInputStreamTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendRequestScopedInputStreamTest.java new file mode 100644 index 0000000000000..03c7dfcb64d38 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/SendRequestScopedInputStreamTest.java @@ -0,0 +1,118 @@ +package io.quarkus.rest.client.reactive; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.equalTo; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicLong; + +import jakarta.enterprise.context.RequestScoped; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.runtime.BlockingOperationControl; +import io.quarkus.runtime.BlockingOperationNotAllowedException; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +public class SendRequestScopedInputStreamTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withConfigurationResource("send-request-scoped-input-stream.properties"); + + @Test + public void test() { + when() + .get("test/in") + .then() + .statusCode(200) + .body(equalTo("" + CustomInputStream.MAX_ITERATIONS)); + + // make sure the request scope of the stream is coming into play + when() + .get("test/in") + .then() + .statusCode(200) + .body(equalTo("" + CustomInputStream.MAX_ITERATIONS)); + + when() + .get("test/uniIn") + .then() + .statusCode(200) + .body(equalTo("" + CustomInputStream.MAX_ITERATIONS)); + } + + @RegisterRestClient(configKey = "test") + @Path("test") + public interface Client { + + @Path("out") + @POST + long count(InputStream is); + + @Path("out") + @POST + Uni uniCount(InputStream is); + } + + @Path("test") + public static class Resource { + + private final CustomInputStream is; + private final Client client; + + public Resource(CustomInputStream is, @RestClient Client client) { + this.is = is; + this.client = client; + } + + @Path("in") + @GET + public long in() { + return client.count(is); + } + + @Path("uniIn") + @GET + public Uni uniIn() { + return client.uniCount(is); + } + + @Path("out") + @POST + public long out(String input) { + return input.length(); + } + } + + @RequestScoped + public static class CustomInputStream extends InputStream { + + private static final int MAX_ITERATIONS = 100; + private final AtomicLong count = new AtomicLong(); + + @Override + public int read() throws IOException { + if (!BlockingOperationControl.isBlockingAllowed()) { + throw new BlockingOperationNotAllowedException("The read method of the stream was called an event loop thread"); + } + if (count.incrementAndGet() <= MAX_ITERATIONS) { + try { + Thread.sleep(20); + } catch (InterruptedException ignored) { + + } + return 'A'; // we don't really care about what we return + } + return -1; // signal the end of the stream + } + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/larger-than-chunk-size.txt b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/larger-than-chunk-size.txt new file mode 100644 index 0000000000000..81f2eed9a4800 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/larger-than-chunk-size.txt @@ -0,0 +1,863 @@ +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: + +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: + +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: + +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: + +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: + +File-Date: 2018-04-23 +%% +Type: language +Subtag: aa +Description: Afar +Added: 2005-10-16 +%% +Type: language +Subtag: ab +Description: Abkhazian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: ae +Description: Avestan +Added: 2005-10-16 +%% +Type: language +Subtag: af +Description: Afrikaans +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ak +Description: Akan +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: am +Description: Amharic +Added: 2005-10-16 +Suppress-Script: Ethi +%% +Type: language +Subtag: an +Description: Aragonese +Added: 2005-10-16 +%% +Type: language +Subtag: ar +Description: Arabic +Added: 2005-10-16 +Suppress-Script: Arab +Scope: macrolanguage +%% +Type: language +Subtag: as +Description: Assamese +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: av +Description: Avaric +Added: 2005-10-16 +%% +Type: language +Subtag: ay +Description: Aymara +Added: 2005-10-16 +Suppress-Script: Latn +Scope: macrolanguage +%% +Type: language +Subtag: az +Description: Azerbaijani +Added: 2005-10-16 +Scope: macrolanguage +%% +Type: language +Subtag: ba +Description: Bashkir +Added: 2005-10-16 +%% +Type: language +Subtag: be +Description: Belarusian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bg +Description: Bulgarian +Added: 2005-10-16 +Suppress-Script: Cyrl +%% +Type: language +Subtag: bh +Description: Bihari languages +Added: 2005-10-16 +Scope: collection +%% +Type: language +Subtag: bi +Description: Bislama +Added: 2005-10-16 +%% +Type: language +Subtag: bm +Description: Bambara +Added: 2005-10-16 +%% +Type: language +Subtag: bn +Description: Bengali +Description: Bangla +Added: 2005-10-16 +Suppress-Script: Beng +%% +Type: language +Subtag: bo +Description: Tibetan +Added: 2005-10-16 +%% +Type: language +Subtag: br +Description: Breton +Added: 2005-10-16 +%% +Type: language +Subtag: bs +Description: Bosnian +Added: 2005-10-16 +Suppress-Script: Latn +Macrolanguage: sh +%% +Type: language +Subtag: ca +Description: Catalan +Description: Valencian +Added: 2005-10-16 +Suppress-Script: Latn +%% +Type: language +Subtag: ce +Description: +%% +Type: language +Subtag: ce +Description: diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/send-request-scoped-input-stream.properties b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/send-request-scoped-input-stream.properties new file mode 100644 index 0000000000000..80af14acfed4f --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/resources/send-request-scoped-input-stream.properties @@ -0,0 +1 @@ +quarkus.rest-client.test.url=http://localhost:${quarkus.http.test-port:8081}/ diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java index 06e2ba8da2477..f4668039940fd 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java @@ -3,6 +3,7 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; @@ -21,6 +22,7 @@ import jakarta.ws.rs.core.MultivaluedMap; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Variant; +import jakarta.ws.rs.ext.WriterInterceptor; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.client.AsyncResultUni; @@ -28,6 +30,7 @@ import org.jboss.resteasy.reactive.client.api.LoggingScope; import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties; import org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl; +import org.jboss.resteasy.reactive.client.impl.InputStreamReadStream; import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext; import org.jboss.resteasy.reactive.client.impl.multipart.PausableHttpPostRequestEncoder; import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartForm; @@ -165,6 +168,15 @@ public void handle(AsyncResult openedAsyncFile) { attachSentHandlers(sent, httpClientRequest, requestContext); } }); + } else if (requestContext.isInputStreamUpload() && !hasWriterInterceptors(requestContext)) { + MultivaluedMap headerMap = requestContext.getRequestHeaders() + .asMap(); + updateRequestHeadersFromConfig(requestContext, headerMap); + Future sent = httpClientRequest.send( + new InputStreamReadStream( + Vertx.currentContext().owner(), (InputStream) requestContext.getEntity().getEntity(), + httpClientRequest)); + attachSentHandlers(sent, httpClientRequest, requestContext); } else { Future sent; Buffer actualEntity; @@ -505,8 +517,7 @@ private Buffer setRequestHeadersAndPrepareBody(HttpClientRequest httpClientReque // no need to set the entity.getMediaType, it comes from the variant setEntityRelatedHeaders(headerMap, entity); - actualEntity = state.writeEntity(entity, headerMap, - state.getConfiguration().getWriterInterceptors().toArray(Serialisers.NO_WRITER_INTERCEPTOR)); + actualEntity = state.writeEntity(entity, headerMap, getWriterInterceptors(state)); } else { // some servers don't like the fact that a POST or PUT does not have a method body if there is no content-length header associated if (state.getHttpMethod().equals("POST") || state.getHttpMethod().equals("PUT")) { @@ -518,6 +529,15 @@ private Buffer setRequestHeadersAndPrepareBody(HttpClientRequest httpClientReque return actualEntity; } + private WriterInterceptor[] getWriterInterceptors(RestClientRequestContext context) { + return context.getConfiguration().getWriterInterceptors().toArray(Serialisers.NO_WRITER_INTERCEPTOR); + } + + private boolean hasWriterInterceptors(RestClientRequestContext context) { + WriterInterceptor[] interceptors = getWriterInterceptors(context); + return interceptors != null && interceptors.length > 0; + } + private void adaptRequest(HttpClientRequest request) { if (request.version() == HttpVersion.HTTP_2) { // When using the protocol HTTP/2, Netty which is internally used by Vert.x will validate the headers and reject diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InputStreamReadStream.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InputStreamReadStream.java new file mode 100644 index 0000000000000..4930cfefd2ff1 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InputStreamReadStream.java @@ -0,0 +1,179 @@ +package org.jboss.resteasy.reactive.client.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.VertxByteBufAllocator; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.impl.InboundBuffer; + +/** + * Copied almost verbatim from Kubernetes + * Client + *

+ * TODO: There is a chance that something like this will land in Vert.x in the future, so we should check back in the future + */ +public class InputStreamReadStream implements ReadStream { + + private static final int CHUNK_SIZE = 2048; + private static final int MAX_DEPTH = 8; + + private final Buffer endSentinel; + private final Vertx vertx; + private final InputStream is; + private final HttpClientRequest request; + private InboundBuffer inboundBuffer; + private Handler exceptionHandler; + private Handler endHandler; + private byte[] bytes; + + public InputStreamReadStream(Vertx vertx, InputStream is, HttpClientRequest request) { + this.vertx = vertx; + this.is = is; + this.request = request; + endSentinel = Buffer.buffer(); + } + + @Override + public ReadStream exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + final ThreadLocal counter = new ThreadLocal() { + @Override + protected AtomicInteger initialValue() { + return new AtomicInteger(); + } + }; + + @Override + public ReadStream handler(Handler handler) { + boolean start = inboundBuffer == null && handler != null; + if (start) { + inboundBuffer = new InboundBuffer<>(vertx.getOrCreateContext()); + inboundBuffer.drainHandler(new Handler<>() { + @Override + public void handle(Void event) { + readChunk(); + } + }); + } + if (handler != null) { + inboundBuffer.handler(new Handler<>() { + @Override + public void handle(Buffer buff) { + if (buff == endSentinel) { + if (endHandler != null) { + endHandler.handle(null); + } + } else { + handler.handle(buff); + } + } + }); + } else { + inboundBuffer.handler(null); + } + if (start) { + readChunk(); + } + return this; + } + + private void readChunk() { + AtomicInteger atomicInteger = counter.get(); + try { + int depth = atomicInteger.getAndIncrement(); + if (depth < MAX_DEPTH) { + readChunk2(); + return; + } + } finally { + atomicInteger.decrementAndGet(); + } + vertx.runOnContext(v -> readChunk()); + } + + private void readChunk2() { + Future fut = vertx.executeBlocking(new Handler<>() { + @Override + public void handle(Promise p) { + if (bytes == null) { + bytes = new byte[CHUNK_SIZE]; + } + int amount; + try { + amount = is.read(bytes); + } catch (IOException e) { + p.fail(e); + return; + } + if (amount == -1) { + p.complete(); + } else { + p.complete( + Buffer.buffer( + VertxByteBufAllocator.DEFAULT.heapBuffer(amount, Integer.MAX_VALUE).writeBytes(bytes, 0, + amount))); + } + } + }); + fut.onComplete(new Handler<>() { + @Override + public void handle(AsyncResult ar) { + if (ar.succeeded()) { + Buffer chunk = ar.result(); + if (chunk != null) { + boolean writable = inboundBuffer.write(chunk); + if (writable) { + readChunk(); + } else { + // Full + } + } else { + inboundBuffer.write(endSentinel); + } + } else { + if (exceptionHandler != null) { + exceptionHandler.handle(ar.cause()); + } + request.reset(0, ar.cause()); + } + } + }); + } + + @Override + public ReadStream endHandler(Handler handler) { + endHandler = handler; + return this; + } + + @Override + public ReadStream pause() { + inboundBuffer.pause(); + return this; + } + + @Override + public ReadStream resume() { + inboundBuffer.resume(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + inboundBuffer.fetch(amount); + return this; + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java index dca9e4fe33bf7..0d0c5fec74615 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java @@ -494,6 +494,10 @@ public boolean isFileUpload() { return entity != null && ((entity.getEntity() instanceof File) || (entity.getEntity() instanceof Path)); } + public boolean isInputStreamUpload() { + return entity != null && entity.getEntity() instanceof InputStream; + } + public boolean isMultipart() { return entity != null && entity.getEntity() instanceof QuarkusMultipartForm; }