Skip to content

Commit

Permalink
✨ ClientHttpRequest 异步 API
Browse files Browse the repository at this point in the history
  • Loading branch information
TAKETODAY committed Sep 24, 2024
1 parent b642308 commit 1fbf067
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.Executor;

import cn.taketoday.http.client.ClientHttpRequest;
import cn.taketoday.http.client.ClientHttpResponse;
import cn.taketoday.http.client.support.HttpRequestDecorator;
import cn.taketoday.lang.Assert;
import cn.taketoday.lang.Nullable;
import cn.taketoday.mock.http.client.MockClientHttpRequest;
import cn.taketoday.test.web.client.ExpectedCount;
import cn.taketoday.test.web.client.MockRestServiceServer;
Expand Down Expand Up @@ -202,6 +204,11 @@ public Future<ClientHttpResponse> async() {
return getRequest().async();
}

@Override
public Future<ClientHttpResponse> async(@Nullable Executor executor) {
return getRequest().async(executor);
}

@Override
public ClientHttpRequest getRequest() {
return (ClientHttpRequest) super.getRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.Executor;

import cn.taketoday.http.HttpMethod;
import cn.taketoday.http.client.ClientHttpRequest;
Expand Down Expand Up @@ -113,8 +114,8 @@ public final ClientHttpResponse execute() throws IOException {
}

@Override
public Future<ClientHttpResponse> async() {
return Future.run(this::execute);
public Future<ClientHttpResponse> async(@Nullable Executor executor) {
return Future.run(this::execute, executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executor;

import cn.taketoday.http.AbstractHttpRequest;
import cn.taketoday.http.HttpHeaders;
Expand Down Expand Up @@ -72,10 +73,10 @@ public final ClientHttpResponse execute() throws IOException {
}

@Override
public Future<ClientHttpResponse> async() {
public Future<ClientHttpResponse> async(@Nullable Executor executor) {
assertNotExecuted();
this.executed = true;
return asyncInternal(headers);
return asyncInternal(headers, executor);
}

/**
Expand Down Expand Up @@ -111,9 +112,9 @@ protected abstract ClientHttpResponse executeInternal(HttpHeaders headers)
* @param headers the HTTP headers
* @return the response object for the executed request
*/
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers) {
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Executor executor) {
// todo 这样实现肯定不行
return Future.run(() -> executeInternal(headers));
return Future.run(() -> executeInternal(headers), executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executor;

import cn.taketoday.http.HttpHeaders;
import cn.taketoday.http.StreamingHttpOutputMessage;
Expand Down Expand Up @@ -72,11 +73,11 @@ protected final ClientHttpResponse executeInternal(HttpHeaders headers) throws I
}

@Override
protected final Future<ClientHttpResponse> asyncInternal(HttpHeaders headers) {
protected final Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Executor executor) {
if (this.body == null && this.bodyStream != null) {
this.body = outputStream -> this.bodyStream.writeTo(outputStream);
}
return asyncInternal(headers, body);
return asyncInternal(headers, body, executor);
}

/**
Expand All @@ -96,9 +97,9 @@ protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, @Null
* @param body the HTTP body, may be {@code null} if no body was {@linkplain #setBody(Body) set}
* @return the response object for the executed request
*/
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body) {
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body, @Nullable Executor executor) {
// todo 这样实现肯定不行
return Future.run(() -> executeInternal(headers, body));
return Future.run(() -> executeInternal(headers, body), executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package cn.taketoday.http.client;

import java.io.IOException;
import java.util.concurrent.Executor;

import cn.taketoday.http.HttpMethod;
import cn.taketoday.http.HttpOutputMessage;
import cn.taketoday.http.HttpRequest;
import cn.taketoday.lang.Nullable;
import cn.taketoday.util.concurrent.Future;

/**
Expand Down Expand Up @@ -49,9 +51,35 @@ public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {
/**
* Execute this request async, resulting in a {@code Future<ClientHttpResponse>} that can be read.
*
* <p> The returned future completes exceptionally with:
* <ul>
* <li>{@link IOException} - if an I/O error occurs when sending or receiving</li>
* </ul>
*
* <p>
* NOT Fully async {@link ClientHttpResponse#getBody()}
*
* @return the async response result of the execution
* @since 5.0
*/
default Future<ClientHttpResponse> async() {
return async(null);
}

/**
* Execute this request async, resulting in a {@code Future<ClientHttpResponse>} that can be read.
*
* <p> The returned future completes exceptionally with:
* <ul>
* <li>{@link IOException} - if an I/O error occurs when sending or receiving</li>
* </ul>
*
* <p>
* NOT Fully async {@link ClientHttpResponse#getBody()}
*
* @return the async response result of the execution
* @since 5.0
*/
Future<ClientHttpResponse> async();
Future<ClientHttpResponse> async(@Nullable Executor executor);

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
/*
* Original Author -> Harry Yang (taketoday@foxmail.com) https://taketoday.cn
* Copyright © Harry Yang & 2017 - 2023 All Rights Reserved.
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER
* Copyright 2017 - 2024 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -15,13 +12,12 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see [http://www.gnu.org/licenses/]
* along with this program. If not, see [https://www.gnu.org/licenses/]
*/

package cn.taketoday.http.client;

import java.io.Closeable;
import java.io.IOException;

import cn.taketoday.http.HttpInputMessage;
import cn.taketoday.http.HttpStatusCode;
Expand All @@ -44,30 +40,27 @@ public interface ClientHttpResponse extends HttpInputMessage, Closeable {
* Get the HTTP status code as an {@link HttpStatusCode}.
*
* @return the HTTP status as {@code HttpStatusCode} value (never {@code null})
* @throws IOException in case of I/O errors
*/
HttpStatusCode getStatusCode() throws IOException;
HttpStatusCode getStatusCode();

/**
* Get the HTTP status code (potentially non-standard and not
* resolvable through the {@link HttpStatusCode} enum) as an integer.
*
* @return the HTTP status as an integer value
* @throws IOException in case of I/O errors
* @see #getStatusCode()
* @see HttpStatusCode#valueOf(int)
*/
default int getRawStatusCode() throws IOException {
default int getRawStatusCode() {
return getStatusCode().value();
}

/**
* Get the HTTP status text of the response.
*
* @return the HTTP status text
* @throws IOException in case of I/O errors
*/
String getStatusText() throws IOException;
String getStatusText();

/**
* Close this response, freeing any resources created.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
/*
* Original Author -> Harry Yang (taketoday@foxmail.com) https://taketoday.cn
* Copyright © TODAY & 2017 - 2022 All Rights Reserved.
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER
* Copyright 2017 - 2024 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -15,7 +12,7 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see [http://www.gnu.org/licenses/]
* along with this program. If not, see [https://www.gnu.org/licenses/]
*/

package cn.taketoday.http.client;
Expand Down Expand Up @@ -52,17 +49,17 @@ public HttpHeaders getHeaders() {
}

@Override
public HttpStatusCode getStatusCode() throws IOException {
public HttpStatusCode getStatusCode() {
return delegate.getStatusCode();
}

@Override
public int getRawStatusCode() throws IOException {
public int getRawStatusCode() {
return delegate.getRawStatusCode();
}

@Override
public String getStatusText() throws IOException {
public String getStatusText() {
return delegate.getStatusText();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body
}

@Override
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body) {
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body, @Nullable Executor executor) {
HttpRequest request = buildRequest(headers, body);
return Future.forAdaption(httpClient.sendAsync(request, BodyHandlers.ofInputStream()))
return Future.forAdaption(httpClient.sendAsync(request, BodyHandlers.ofInputStream()), executor)
.map(JdkClientHttpResponse::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ static IOException convertException(RuntimeException ex) {
}

@Override
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body) {
protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable Body body, @Nullable Executor executor) {
HttpClient.RequestSender requestSender = httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));

requestSender = uri.isAbsolute() ? requestSender.uri(uri) : requestSender.uri(uri.toString());

Promise<ClientHttpResponse> promise = Future.forPromise();
Promise<ClientHttpResponse> promise = Future.forPromise(executor);
requestSender.send((reactorRequest, nettyOutbound) -> send(headers, body, reactorRequest, nettyOutbound))
.responseConnection((reactorResponse, connection) -> Mono.just(new ReactorClientHttpResponse(reactorResponse, connection, readTimeout)))
.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,9 @@ public <T> ResponseEntity<T> toEntity(ParameterizedTypeReference<T> bodyType) {

private <T> ResponseEntity<T> toEntityInternal(Type bodyType, Class<T> bodyClass) {
T body = readBody(bodyType, bodyClass);
try {
return ResponseEntity.status(this.clientResponse.getStatusCode())
.headers(this.clientResponse.getHeaders())
.body(body);
}
catch (IOException ex) {
throw new ResourceAccessException("Could not retrieve response status code: " + ex.getMessage(), ex);
}
return ResponseEntity.status(this.clientResponse.getStatusCode())
.headers(this.clientResponse.getHeaders())
.body(body);
}

@Override
Expand All @@ -637,9 +632,6 @@ public ResponseEntity<Void> toBodilessEntity() {
catch (UncheckedIOException ex) {
throw new ResourceAccessException("Could not retrieve response status code: " + ex.getMessage(), ex.getCause());
}
catch (IOException ex) {
throw new ResourceAccessException("Could not retrieve response status code: " + ex.getMessage(), ex);
}
}

@Nullable
Expand Down Expand Up @@ -699,12 +691,17 @@ public HttpHeaders getHeaders() {
}

@Override
public HttpStatusCode getStatusCode() throws IOException {
public HttpStatusCode getStatusCode() {
return this.delegate.getStatusCode();
}

@Override
public String getStatusText() throws IOException {
public int getRawStatusCode() {
return delegate.getRawStatusCode();
}

@Override
public String getStatusText() {
return this.delegate.getStatusText();
}

Expand Down
14 changes: 4 additions & 10 deletions today-web/src/main/java/cn/taketoday/web/client/RestTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import cn.taketoday.http.HttpEntity;
import cn.taketoday.http.HttpHeaders;
import cn.taketoday.http.HttpMethod;
import cn.taketoday.http.HttpStatusCode;
import cn.taketoday.http.MediaType;
import cn.taketoday.http.RequestEntity;
import cn.taketoday.http.ResponseEntity;
Expand Down Expand Up @@ -702,8 +701,8 @@ public <T> T execute(URI url, HttpMethod method, @Nullable RequestCallback reque
* @return an arbitrary object, as returned by the {@link ResponseExtractor}
*/
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method,
@Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
Expand Down Expand Up @@ -746,13 +745,7 @@ protected void handleResponse(URI url, HttpMethod method, ClientHttpResponse res
ResponseErrorHandler errorHandler = getErrorHandler();
boolean hasError = errorHandler.hasError(response);
if (logger.isDebugEnabled()) {
try {
HttpStatusCode status = response.getStatusCode();
logger.debug("{} Response {}", url, status);
}
catch (IOException ex) {
logger.debug("Failed to get response status code", ex);
}
logger.debug("{} Response {}", url, response.getStatusCode());
}
if (hasError) {
errorHandler.handleError(url, method, response);
Expand Down Expand Up @@ -965,6 +958,7 @@ private static void copyHttpHeaders(HttpHeaders httpHeaders, HttpHeaders request
* Response extractor for {@link HttpEntity}.
*/
private class ResponseEntityResponseExtractor<T> implements ResponseExtractor<ResponseEntity<T>> {

@Nullable
private final HttpMessageConverterExtractor<T> delegate;

Expand Down
Loading

0 comments on commit 1fbf067

Please sign in to comment.