Skip to content

Commit

Permalink
s2a: Address comments on S2A channel + stub (#11584)
Browse files Browse the repository at this point in the history
* delete HandshakerServiceChannel.

* remove usage of S2AGrpcChannelPool + avoid creating Channel ref per conn.
  • Loading branch information
rmehta19 authored Oct 2, 2024
1 parent b8a0ba4 commit 959060a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 517 deletions.
43 changes: 0 additions & 43 deletions s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java

This file was deleted.

109 changes: 0 additions & 109 deletions s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.NettyChannelBuilder;
import java.time.Duration;
Expand Down Expand Up @@ -55,6 +51,8 @@
@ThreadSafe
public final class S2AHandshakerServiceChannel {
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
private static final Logger logger =
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());

/**
* Returns a {@link SharedResourceHolder.Resource} instance for managing channels to an S2A server
Expand Down Expand Up @@ -86,75 +84,35 @@ public ChannelResource(String targetAddress, ChannelCredentials channelCredentia
}

/**
* Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
* Creates a {@code ManagedChannel} instance to the service running at {@code
* targetAddress}.
*/
@Override
public Channel create() {
ManagedChannel channel =
NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
return NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
.directExecutor()
.idleTimeout(5, SECONDS)
.build();
return HandshakerServiceChannel.create(channel);
}

/** Destroys a {@code HandshakerServiceChannel} instance. */
/** Destroys a {@code ManagedChannel} instance. */
@Override
public void close(Channel instanceChannel) {
checkNotNull(instanceChannel);
HandshakerServiceChannel channel = (HandshakerServiceChannel) instanceChannel;
channel.close();
}

@Override
public String toString() {
return "grpc-s2a-channel";
}
}

/**
* Manages a channel using a {@link ManagedChannel} instance.
*/
@VisibleForTesting
static class HandshakerServiceChannel extends Channel {
private static final Logger logger =
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());
private final ManagedChannel delegate;

static HandshakerServiceChannel create(ManagedChannel delegate) {
checkNotNull(delegate);
return new HandshakerServiceChannel(delegate);
}

private HandshakerServiceChannel(ManagedChannel delegate) {
this.delegate = delegate;
}

/**
* Returns the address of the service to which the {@code delegate} channel connects, which is
* typically of the form {@code host:port}.
*/
@Override
public String authority() {
return delegate.authority();
}

/** Creates a {@link ClientCall} that invokes the operations in {@link MethodDescriptor}. */
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions options) {
return delegate.newCall(methodDescriptor, options);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void close() {
delegate.shutdownNow();
ManagedChannel channel = (ManagedChannel) instanceChannel;
channel.shutdownNow();
try {
delegate.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
channel.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.WARNING, "Channel to S2A was not shutdown.");
}

}

@Override
public String toString() {
return "grpc-s2a-channel";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.InternalProtocolNegotiators.ProtocolNegotiationHandler;
import io.grpc.s2a.internal.channel.S2AChannelPool;
import io.grpc.s2a.internal.channel.S2AGrpcChannelPool;
import io.grpc.s2a.internal.handshaker.S2AIdentity;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
Expand Down Expand Up @@ -70,17 +68,16 @@ public final class S2AProtocolNegotiatorFactory {
public static InternalProtocolNegotiator.ClientFactory createClientFactory(
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> s2aChannelPool) {
checkNotNull(s2aChannelPool, "S2A channel pool should not be null.");
S2AChannelPool channelPool = S2AGrpcChannelPool.create(s2aChannelPool);
return new S2AClientProtocolNegotiatorFactory(localIdentity, channelPool);
return new S2AClientProtocolNegotiatorFactory(localIdentity, s2aChannelPool);
}

static final class S2AClientProtocolNegotiatorFactory
implements InternalProtocolNegotiator.ClientFactory {
private final @Nullable S2AIdentity localIdentity;
private final S2AChannelPool channelPool;
private final ObjectPool<Channel> channelPool;

S2AClientProtocolNegotiatorFactory(
@Nullable S2AIdentity localIdentity, S2AChannelPool channelPool) {
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> channelPool) {
this.localIdentity = localIdentity;
this.channelPool = channelPool;
}
Expand All @@ -100,13 +97,14 @@ public int getDefaultPort() {
@VisibleForTesting
static final class S2AProtocolNegotiator implements ProtocolNegotiator {

private final S2AChannelPool channelPool;
private final ObjectPool<Channel> channelPool;
private final Channel channel;
private final Optional<S2AIdentity> localIdentity;
private final ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));

static S2AProtocolNegotiator createForClient(
S2AChannelPool channelPool, @Nullable S2AIdentity localIdentity) {
ObjectPool<Channel> channelPool, @Nullable S2AIdentity localIdentity) {
checkNotNull(channelPool, "Channel pool should not be null.");
if (localIdentity == null) {
return new S2AProtocolNegotiator(channelPool, Optional.empty());
Expand All @@ -123,9 +121,11 @@ static S2AProtocolNegotiator createForClient(
return HostAndPort.fromString(authority).getHost();
}

private S2AProtocolNegotiator(S2AChannelPool channelPool, Optional<S2AIdentity> localIdentity) {
private S2AProtocolNegotiator(ObjectPool<Channel> channelPool,
Optional<S2AIdentity> localIdentity) {
this.channelPool = channelPool;
this.localIdentity = localIdentity;
this.channel = channelPool.getObject();
}

@Override
Expand All @@ -139,13 +139,13 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
String hostname = getHostNameFromAuthority(grpcHandler.getAuthority());
checkArgument(!isNullOrEmpty(hostname), "hostname should not be null or empty.");
return new S2AProtocolNegotiationHandler(
grpcHandler, channelPool, localIdentity, hostname, service);
grpcHandler, channel, localIdentity, hostname, service);
}

@Override
public void close() {
service.shutdown();
channelPool.close();
channelPool.returnObject(channel);
}
}

Expand Down Expand Up @@ -180,15 +180,15 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}

private static final class S2AProtocolNegotiationHandler extends ProtocolNegotiationHandler {
private final S2AChannelPool channelPool;
private final Channel channel;
private final Optional<S2AIdentity> localIdentity;
private final String hostname;
private final GrpcHttp2ConnectionHandler grpcHandler;
private final ListeningExecutorService service;

private S2AProtocolNegotiationHandler(
GrpcHttp2ConnectionHandler grpcHandler,
S2AChannelPool channelPool,
Channel channel,
Optional<S2AIdentity> localIdentity,
String hostname,
ListeningExecutorService service) {
Expand All @@ -204,7 +204,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
},
grpcHandler.getNegotiationLogger());
this.grpcHandler = grpcHandler;
this.channelPool = channelPool;
this.channel = channel;
this.localIdentity = localIdentity;
this.hostname = hostname;
checkNotNull(service, "service should not be null.");
Expand All @@ -217,8 +217,7 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
BufferReadsHandler bufferReads = new BufferReadsHandler();
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, bufferReads);

Channel ch = channelPool.getChannel();
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(ch);
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(channel);
S2AStub s2aStub = S2AStub.newInstance(stub);

ListenableFuture<SslContext> sslContextFuture =
Expand Down
Loading

0 comments on commit 959060a

Please sign in to comment.