Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GrpcChannelOptions.HttpHandler and channel default to invoker #896

Merged
merged 3 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ public void GlobalSetup()
return ResponseUtils.CreateResponse(HttpStatusCode.OK, content, grpcEncoding: ResponseCompressionAlgorithm);
});

var httpClient = new HttpClient(handler);

var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions
{
HttpClient = httpClient,
HttpHandler = handler,
CompressionProviders = CompressionProviders
});

Expand Down
76 changes: 55 additions & 21 deletions src/Grpc.Net.Client/GrpcChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using Grpc.Core;
Expand All @@ -40,9 +41,11 @@ public sealed class GrpcChannel : ChannelBase, IDisposable

private readonly ConcurrentDictionary<IMethod, GrpcMethodInfo> _methodInfoCache;
private readonly Func<IMethod, GrpcMethodInfo> _createMethodInfoFunc;
// Internal for testing
internal readonly HashSet<IDisposable> ActiveCalls;

internal Uri Address { get; }
internal HttpClient HttpClient { get; }
internal HttpMessageInvoker HttpInvoker { get; }
internal int? SendMaxMessageSize { get; }
internal int? ReceiveMaxMessageSize { get; }
internal ILoggerFactory LoggerFactory { get; }
Expand All @@ -63,20 +66,22 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr
{
_methodInfoCache = new ConcurrentDictionary<IMethod, GrpcMethodInfo>();

// Dispose the HttpClient if...
// 1. No client was specified and so the channel created the HttpClient itself
// 2. User has specified a client and set DisposeHttpClient to true
_shouldDisposeHttpClient = channelOptions.HttpClient == null || channelOptions.DisposeHttpClient;
// Dispose the HTTP client/handler if...
// 1. No client/handler was specified and so the channel created the client itself
// 2. User has specified a client/handler and set DisposeHttpClient to true
_shouldDisposeHttpClient = (channelOptions.HttpClient == null && channelOptions.HttpHandler == null)
|| channelOptions.DisposeHttpClient;

Address = address;
HttpClient = channelOptions.HttpClient ?? CreateInternalHttpClient();
HttpInvoker = channelOptions.HttpClient ?? CreateInternalHttpInvoker(channelOptions.HttpHandler);
SendMaxMessageSize = channelOptions.MaxSendMessageSize;
ReceiveMaxMessageSize = channelOptions.MaxReceiveMessageSize;
CompressionProviders = ResolveCompressionProviders(channelOptions.CompressionProviders);
MessageAcceptEncoding = GrpcProtocolHelpers.GetMessageAcceptEncoding(CompressionProviders);
LoggerFactory = channelOptions.LoggerFactory ?? NullLoggerFactory.Instance;
ThrowOperationCanceledOnCancellation = channelOptions.ThrowOperationCanceledOnCancellation;
_createMethodInfoFunc = CreateMethodInfo;
ActiveCalls = new HashSet<IDisposable>();

if (channelOptions.Credentials != null)
{
Expand All @@ -90,21 +95,29 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr
}
}

private static HttpClient CreateInternalHttpClient()
private static HttpMessageInvoker CreateInternalHttpInvoker(HttpMessageHandler? handler)
{
var httpClient = new HttpClient();

// Long running server and duplex streaming gRPC requests may not
// return any messages for over 100 seconds, triggering a cancellation
// of HttpClient.SendAsync. Disable timeout in internally created
// HttpClient for channel.
//
// gRPC deadline should be the recommended way to timeout gRPC calls.
//
// https://github.com/dotnet/corefx/issues/41650
httpClient.Timeout = Timeout.InfiniteTimeSpan;

return httpClient;
// HttpMessageInvoker should always dispose handler if Disposed is called on it.
// Decision to dispose invoker is controlled by _shouldDisposeHttpClient.
var httpInvoker = new HttpMessageInvoker(handler ?? new HttpClientHandler(), disposeHandler: true);
JamesNK marked this conversation as resolved.
Show resolved Hide resolved

return httpInvoker;
}

internal void RegisterActiveCall(IDisposable grpcCall)
{
lock (ActiveCalls)
{
ActiveCalls.Add(grpcCall);
}
}

internal void FinishActiveCall(IDisposable grpcCall)
{
lock (ActiveCalls)
{
ActiveCalls.Remove(grpcCall);
}
}

internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method)
Expand Down Expand Up @@ -261,6 +274,12 @@ public static GrpcChannel ForAddress(Uri address, GrpcChannelOptions channelOpti
throw new ArgumentNullException(nameof(channelOptions));
}

if (channelOptions.HttpClient != null && channelOptions.HttpHandler != null)
{
throw new ArgumentException($"{nameof(GrpcChannelOptions.HttpClient)} and {nameof(GrpcChannelOptions.HttpHandler)} have been configured. " +
$"Only one HTTP caller can be specified.");
}

return new GrpcChannel(address, channelOptions);
}

Expand All @@ -275,9 +294,24 @@ public void Dispose()
return;
}

lock (ActiveCalls)
{
if (ActiveCalls.Count > 0)
{
// Disposing a call will remove it from ActiveCalls. Need to take a copy
// to avoid enumeration from being modified
var activeCallsCopy = ActiveCalls.ToArray();

foreach (var activeCall in activeCallsCopy)
{
activeCall.Dispose();
}
}
}

if (_shouldDisposeHttpClient)
{
HttpClient.Dispose();
HttpInvoker.Dispose();
}
Disposed = true;
}
Expand Down
35 changes: 29 additions & 6 deletions src/Grpc.Net.Client/GrpcChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,46 @@ public sealed class GrpcChannelOptions
public ILoggerFactory? LoggerFactory { get; set; }

/// <summary>
/// Gets or sets the <see cref="HttpClient"/> used by the channel.
/// Gets or sets the <see cref="System.Net.Http.HttpClient"/> used by the channel to make HTTP calls.
/// </summary>
/// <remarks>
/// <para>
/// By default a <see cref="System.Net.Http.HttpClient"/> specified here will not be disposed with the channel.
/// To dispose the <see cref="System.Net.Http.HttpClient"/> with the channel you must set <see cref="DisposeHttpClient"/>
/// to <c>true</c>.
/// </para>
/// <para>
/// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured
/// together with <see cref="HttpHandler"/>.
/// </para>
/// </remarks>
public HttpClient? HttpClient { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the underlying <see cref="System.Net.Http.HttpClient"/> should be disposed
/// when the <see cref="GrpcChannel"/> instance is disposed. The default value is <c>false</c>.
/// Gets or sets the <see cref="HttpMessageHandler"/> used by the channel to make HTTP calls.
/// </summary>
/// <remarks>
/// <para>
/// By default a <see cref="HttpMessageHandler"/> specified here will not be disposed with the channel.
/// To dispose the <see cref="HttpMessageHandler"/> with the channel you must set <see cref="DisposeHttpClient"/>
/// to <c>true</c>.
/// </para>
/// <para>
/// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured
/// together with <see cref="HttpClient"/>.
/// </para>
/// </remarks>
public HttpMessageHandler? HttpHandler { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the underlying <see cref="System.Net.Http.HttpClient"/> or
/// <see cref="HttpMessageHandler"/> should be disposed when the <see cref="GrpcChannel"/> instance is disposed.
/// The default value is <c>false</c>.
/// </summary>
/// <remarks>
/// This setting is used when a <see cref="HttpClient"/> value is specified. If no <see cref="HttpClient"/> value is provided
/// then the channel will create an <see cref="System.Net.Http.HttpClient"/> instance that is always disposed when
/// the channel is disposed.
/// This setting is used when a <see cref="HttpClient"/> or <see cref="HttpHandler"/> value is specified.
/// If they are not specified then the channel will create an internal HTTP caller that is always disposed
/// when the channel is disposed.
/// </remarks>
public bool DisposeHttpClient { get; set; }

Expand Down
21 changes: 15 additions & 6 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public GrpcCall(Method<TRequest, TResponse> method, GrpcMethodInfo grpcMethodInf
Channel = channel;
Logger = channel.LoggerFactory.CreateLogger(LoggerName);
_deadline = options.Deadline ?? DateTime.MaxValue;

Channel.RegisterActiveCall(this);
}

private void ValidateDeadline(DateTime? deadline)
Expand Down Expand Up @@ -168,6 +170,8 @@ private void Cleanup(Status status)
ClientStreamReader?.HttpResponseTcs.TrySetCanceled();
}

Channel.FinishActiveCall(this);

_ctsRegistration?.Dispose();
_deadlineTimer?.Dispose();
HttpResponse?.Dispose();
Expand Down Expand Up @@ -459,7 +463,12 @@ private async ValueTask RunCall(HttpRequestMessage request, TimeSpan? timeout)

try
{
_httpResponseTask = Channel.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token);
// If a HttpClient has been specified then we need to call it with ResponseHeadersRead
// so that the response message is available for streaming
_httpResponseTask = (Channel.HttpInvoker is HttpClient httpClient)
? httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token)
: Channel.HttpInvoker.SendAsync(request, _callCts.Token);

HttpResponse = await _httpResponseTask.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -768,11 +777,11 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)
var headers = message.Headers;

// User agent is optional but recommended.
headers.Add(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue);
headers.TryAddWithoutValidation(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue);
// TE is required by some servers, e.g. C Core.
// A missing TE header results in servers aborting the gRPC call.
headers.Add(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue);
headers.Add(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding);
headers.TryAddWithoutValidation(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue);
headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding);

if (Options.Headers != null && Options.Headers.Count > 0)
{
Expand All @@ -788,7 +797,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)
// grpc-internal-encoding-request is used in the client to set message compression.
// 'grpc-encoding' is sent even if WriteOptions.Flags = NoCompress. In that situation
// individual messages will not be written with compression.
headers.Add(GrpcProtocolConstants.MessageEncodingHeader, entry.Value);
headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageEncodingHeader, entry.Value);
}
else
{
Expand All @@ -799,7 +808,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)

if (timeout != null)
{
headers.Add(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond));
headers.TryAddWithoutValidation(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond));
}

return message;
Expand Down
2 changes: 1 addition & 1 deletion src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ internal async static Task ReadCredentialMetadata(
public static void AddHeader(HttpRequestHeaders headers, Metadata.Entry entry)
{
var value = entry.IsBinary ? Convert.ToBase64String(entry.ValueBytes) : entry.Value;
headers.Add(entry.Key, value);
headers.TryAddWithoutValidation(entry.Key, value);
}

public static string? GetHeaderValue(HttpHeaders? headers, string name)
Expand Down
82 changes: 37 additions & 45 deletions src/Grpc.Net.Client/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,6 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
return new Status(StatusCode.Unimplemented, $"Unsupported grpc-encoding value '{unsupportedEncoding}'. Supported encodings: {string.Join(", ", supportedEncodings)}");
}

private static async Task<(int length, bool compressed)?> ReadHeaderAsync(Stream responseStream, Memory<byte> header, CancellationToken cancellationToken)
{
int read;
var received = 0;
while ((read = await responseStream.ReadAsync(header.Slice(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0)
{
received += read;

if (received == GrpcProtocolConstants.HeaderSize)
{
break;
}
}

if (received < GrpcProtocolConstants.HeaderSize)
{
if (received == 0)
{
return null;
}

throw new InvalidDataException("Unexpected end of content while reading the message header.");
}

// Read the header first
// - 1 byte flag for compression
// - 4 bytes for the content length
var compressed = ReadCompressedFlag(header.Span[0]);
var length = BinaryPrimitives.ReadUInt32BigEndian(header.Span.Slice(1, 4));

if (length > int.MaxValue)
{
throw new InvalidDataException("Message too large.");
}

return ((int)length, compressed);
}

public static async ValueTask<TResponse?> ReadMessageAsync<TResponse>(
this Stream responseStream,
ILogger logger,
Expand All @@ -107,16 +69,34 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
// If the message is larger then the array will be replaced when the message size is known.
buffer = ArrayPool<byte>.Shared.Rent(minimumLength: 4096);

var headerDetails = await ReadHeaderAsync(responseStream, buffer, cancellationToken).ConfigureAwait(false);
int read;
JamesNK marked this conversation as resolved.
Show resolved Hide resolved
var received = 0;
while ((read = await responseStream.ReadAsync(buffer.AsMemory(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0)
{
received += read;

if (headerDetails == null)
if (received == GrpcProtocolConstants.HeaderSize)
{
break;
}
}

if (received < GrpcProtocolConstants.HeaderSize)
{
GrpcCallLog.NoMessageReturned(logger);
return default;
if (received == 0)
{
GrpcCallLog.NoMessageReturned(logger);
return default;
}

throw new InvalidDataException("Unexpected end of content while reading the message header.");
}

var length = headerDetails.Value.length;
var compressed = headerDetails.Value.compressed;
// Read the header first
// - 1 byte flag for compression
// - 4 bytes for the content length
var compressed = ReadCompressedFlag(buffer[0]);
var length = ReadMessageLength(buffer.AsSpan(1, 4));

if (length > 0)
{
Expand Down Expand Up @@ -200,6 +180,18 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
}
}

private static int ReadMessageLength(Span<byte> header)
{
var length = BinaryPrimitives.ReadUInt32BigEndian(header);

if (length > int.MaxValue)
{
throw new InvalidDataException("Message too large.");
}

return (int)length;
}

private static async Task ReadMessageContent(Stream responseStream, Memory<byte> messageData, int length, CancellationToken cancellationToken)
{
// Read message content until content length is reached
Expand Down Expand Up @@ -257,7 +249,7 @@ private static bool ReadCompressedFlag(byte flag)
}
}

// TODO(JamesNK): Reuse serialization content between message writes. Improve client/duplex streaming allocations.
// TODO(JamesNK): Reuse serialization context between message writes. Improve client/duplex streaming allocations.
public static async ValueTask WriteMessageAsync<TMessage>(
this Stream stream,
ILogger logger,
Expand Down
Loading