From 0c22fc006a194b6905e820cbcd56dc61d5aea033 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 5 May 2020 12:57:32 +1200 Subject: [PATCH 1/3] Add GrpcChannelOptions.HttpHandler and configure channel to default to invoker --- .../Client/UnaryClientBenchmarkBase.cs | 5 +- src/Grpc.Net.Client/GrpcChannel.cs | 67 +++++++---- src/Grpc.Net.Client/GrpcChannelOptions.cs | 35 +++++- src/Grpc.Net.Client/Internal/GrpcCall.cs | 19 +++- .../Internal/GrpcProtocolHelpers.cs | 2 +- .../Internal/StreamExtensions.cs | 42 ++++++- .../Grpc.Net.Client.Tests/GrpcChannelTests.cs | 105 +++++++++++++----- .../DefaultGrpcClientFactoryTests.cs | 2 +- 8 files changed, 208 insertions(+), 69 deletions(-) diff --git a/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs b/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs index 6c2e67f3c..bbd40271a 100644 --- a/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs +++ b/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs @@ -23,6 +23,7 @@ using System.Net; using System.Net.Http; using System.Net.Http.Headers; +using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Greet; @@ -64,11 +65,9 @@ public void GlobalSetup() return ResponseUtils.CreateResponse(HttpStatusCode.OK, content, grpcEncoding: ResponseCompressionAlgorithm); }); - var httpClient = new HttpClient(handler); - var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions { - HttpClient = httpClient, + HttpHandler = handler, CompressionProviders = CompressionProviders }); diff --git a/src/Grpc.Net.Client/GrpcChannel.cs b/src/Grpc.Net.Client/GrpcChannel.cs index 52f60e6cd..e00b9c433 100644 --- a/src/Grpc.Net.Client/GrpcChannel.cs +++ b/src/Grpc.Net.Client/GrpcChannel.cs @@ -40,9 +40,11 @@ public sealed class GrpcChannel : ChannelBase, IDisposable private readonly ConcurrentDictionary _methodInfoCache; private readonly Func _createMethodInfoFunc; + // Internal for testing + internal readonly HashSet ActiveCalls; internal Uri Address { get; } - internal HttpClient HttpClient { get; } + internal HttpMessageInvoker HttpInvoker { get; } internal int? SendMaxMessageSize { get; } internal int? ReceiveMaxMessageSize { get; } internal ILoggerFactory LoggerFactory { get; } @@ -64,12 +66,13 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr _methodInfoCache = new ConcurrentDictionary(); // Dispose the HttpClient if... - // 1. No client was specified and so the channel created the HttpClient itself - // 2. User has specified a client and set DisposeHttpClient to true - _shouldDisposeHttpClient = channelOptions.HttpClient == null || channelOptions.DisposeHttpClient; + // 1. No client/handler was specified and so the channel created the client itself + // 2. User has specified a client/handler and set DisposeHttpClient to true + _shouldDisposeHttpClient = (channelOptions.HttpClient == null && channelOptions.HttpHandler == null) + || channelOptions.DisposeHttpClient; Address = address; - HttpClient = channelOptions.HttpClient ?? CreateInternalHttpClient(); + HttpInvoker = channelOptions.HttpClient ?? CreateInternalHttpInvoker(channelOptions.HttpHandler); SendMaxMessageSize = channelOptions.MaxSendMessageSize; ReceiveMaxMessageSize = channelOptions.MaxReceiveMessageSize; CompressionProviders = ResolveCompressionProviders(channelOptions.CompressionProviders); @@ -77,6 +80,7 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr LoggerFactory = channelOptions.LoggerFactory ?? NullLoggerFactory.Instance; ThrowOperationCanceledOnCancellation = channelOptions.ThrowOperationCanceledOnCancellation; _createMethodInfoFunc = CreateMethodInfo; + ActiveCalls = new HashSet(); if (channelOptions.Credentials != null) { @@ -90,21 +94,29 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr } } - private static HttpClient CreateInternalHttpClient() + private static HttpMessageInvoker CreateInternalHttpInvoker(HttpMessageHandler? handler) { - var httpClient = new HttpClient(); - - // Long running server and duplex streaming gRPC requests may not - // return any messages for over 100 seconds, triggering a cancellation - // of HttpClient.SendAsync. Disable timeout in internally created - // HttpClient for channel. - // - // gRPC deadline should be the recommended way to timeout gRPC calls. - // - // https://github.com/dotnet/corefx/issues/41650 - httpClient.Timeout = Timeout.InfiniteTimeSpan; - - return httpClient; + // HttpMessageInvoker should always dispose handler if Disposed is called on it. + // Decision to dispose invoker is controlled by _shouldDisposeHttpClient. + var httpInvoker = new HttpMessageInvoker(handler ?? new HttpClientHandler(), disposeHandler: true); + + return httpInvoker; + } + + internal void RegisterActiveCall(IDisposable grpcCall) + { + lock (ActiveCalls) + { + ActiveCalls.Add(grpcCall); + } + } + + internal void FinishActiveCall(IDisposable grpcCall) + { + lock (ActiveCalls) + { + ActiveCalls.Remove(grpcCall); + } } internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method) @@ -261,6 +273,12 @@ public static GrpcChannel ForAddress(Uri address, GrpcChannelOptions channelOpti throw new ArgumentNullException(nameof(channelOptions)); } + if (channelOptions.HttpClient != null && channelOptions.HttpHandler != null) + { + throw new ArgumentException($"{nameof(GrpcChannelOptions.HttpClient)} and {nameof(GrpcChannelOptions.HttpHandler)} have been configured. " + + $"Only one HTTP caller can be specified."); + } + return new GrpcChannel(address, channelOptions); } @@ -275,9 +293,18 @@ public void Dispose() return; } + lock (ActiveCalls) + { + // Disposing calls will remove them from ActiveCalls + foreach (var activeCall in ActiveCalls) + { + activeCall.Dispose(); + } + } + if (_shouldDisposeHttpClient) { - HttpClient.Dispose(); + HttpInvoker.Dispose(); } Disposed = true; } diff --git a/src/Grpc.Net.Client/GrpcChannelOptions.cs b/src/Grpc.Net.Client/GrpcChannelOptions.cs index c7d9267cd..d325ea1eb 100644 --- a/src/Grpc.Net.Client/GrpcChannelOptions.cs +++ b/src/Grpc.Net.Client/GrpcChannelOptions.cs @@ -68,23 +68,46 @@ public sealed class GrpcChannelOptions public ILoggerFactory? LoggerFactory { get; set; } /// - /// Gets or sets the used by the channel. + /// Gets or sets the used by the channel to make HTTP calls. /// /// + /// /// By default a specified here will not be disposed with the channel. /// To dispose the with the channel you must set /// to true. + /// + /// + /// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured + /// together with . + /// /// public HttpClient? HttpClient { get; set; } /// - /// Gets or sets a value indicating whether the underlying should be disposed - /// when the instance is disposed. The default value is false. + /// Gets or sets the used by the channel to make HTTP calls. + /// + /// + /// + /// By default a specified here will not be disposed with the channel. + /// To dispose the with the channel you must set + /// to true. + /// + /// + /// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured + /// together with . + /// + /// + public HttpMessageHandler? HttpHandler { get; set; } + + /// + /// Gets or sets a value indicating whether the underlying or + /// should be disposed when the instance is disposed. + /// The default value is false. /// /// - /// This setting is used when a value is specified. If no value is provided - /// then the channel will create an instance that is always disposed when - /// the channel is disposed. + /// This setting is used when a or value is specified. + /// If they are not specified then the channel will create an internal HTTP caller that is always disposed + /// when the channel is disposed. /// public bool DisposeHttpClient { get; set; } diff --git a/src/Grpc.Net.Client/Internal/GrpcCall.cs b/src/Grpc.Net.Client/Internal/GrpcCall.cs index e5248293b..1f82e1a38 100644 --- a/src/Grpc.Net.Client/Internal/GrpcCall.cs +++ b/src/Grpc.Net.Client/Internal/GrpcCall.cs @@ -75,6 +75,8 @@ public GrpcCall(Method method, GrpcMethodInfo grpcMethodInf Channel = channel; Logger = channel.LoggerFactory.CreateLogger(LoggerName); _deadline = options.Deadline ?? DateTime.MaxValue; + + Channel.RegisterActiveCall(this); } private void ValidateDeadline(DateTime? deadline) @@ -459,7 +461,12 @@ private async ValueTask RunCall(HttpRequestMessage request, TimeSpan? timeout) try { - _httpResponseTask = Channel.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token); + // If a HttpClient has been specified then we need to call it with ResponseHeadersRead + // so that the response message is available for streaming + _httpResponseTask = (Channel.HttpInvoker is HttpClient httpClient) + ? httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token) + : Channel.HttpInvoker.SendAsync(request, _callCts.Token); + HttpResponse = await _httpResponseTask.ConfigureAwait(false); } catch (Exception ex) @@ -768,11 +775,11 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout) var headers = message.Headers; // User agent is optional but recommended. - headers.Add(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue); + headers.TryAddWithoutValidation(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue); // TE is required by some servers, e.g. C Core. // A missing TE header results in servers aborting the gRPC call. - headers.Add(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue); - headers.Add(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding); + headers.TryAddWithoutValidation(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue); + headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding); if (Options.Headers != null && Options.Headers.Count > 0) { @@ -788,7 +795,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout) // grpc-internal-encoding-request is used in the client to set message compression. // 'grpc-encoding' is sent even if WriteOptions.Flags = NoCompress. In that situation // individual messages will not be written with compression. - headers.Add(GrpcProtocolConstants.MessageEncodingHeader, entry.Value); + headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageEncodingHeader, entry.Value); } else { @@ -799,7 +806,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout) if (timeout != null) { - headers.Add(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond)); + headers.TryAddWithoutValidation(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond)); } return message; diff --git a/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs b/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs index a047765eb..d0c54d8d3 100644 --- a/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs +++ b/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs @@ -269,7 +269,7 @@ internal async static Task ReadCredentialMetadata( public static void AddHeader(HttpRequestHeaders headers, Metadata.Entry entry) { var value = entry.IsBinary ? Convert.ToBase64String(entry.ValueBytes) : entry.Value; - headers.Add(entry.Key, value); + headers.TryAddWithoutValidation(entry.Key, value); } public static string? GetHeaderValue(HttpHeaders? headers, string name) diff --git a/src/Grpc.Net.Client/Internal/StreamExtensions.cs b/src/Grpc.Net.Client/Internal/StreamExtensions.cs index 3e3fef79e..ac6ed2aee 100644 --- a/src/Grpc.Net.Client/Internal/StreamExtensions.cs +++ b/src/Grpc.Net.Client/Internal/StreamExtensions.cs @@ -107,16 +107,34 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport // If the message is larger then the array will be replaced when the message size is known. buffer = ArrayPool.Shared.Rent(minimumLength: 4096); - var headerDetails = await ReadHeaderAsync(responseStream, buffer, cancellationToken).ConfigureAwait(false); + int read; + var received = 0; + while ((read = await responseStream.ReadAsync(buffer.AsMemory(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0) + { + received += read; + + if (received == GrpcProtocolConstants.HeaderSize) + { + break; + } + } - if (headerDetails == null) + if (received < GrpcProtocolConstants.HeaderSize) { - GrpcCallLog.NoMessageReturned(logger); - return default; + if (received == 0) + { + GrpcCallLog.NoMessageReturned(logger); + return default; + } + + throw new InvalidDataException("Unexpected end of content while reading the message header."); } - var length = headerDetails.Value.length; - var compressed = headerDetails.Value.compressed; + // Read the header first + // - 1 byte flag for compression + // - 4 bytes for the content length + var compressed = ReadCompressedFlag(buffer[0]); + var length = ReadMessageLength(buffer.AsSpan(1, 4)); if (length > 0) { @@ -200,6 +218,18 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport } } + private static int ReadMessageLength(Span header) + { + var length = BinaryPrimitives.ReadUInt32BigEndian(header); + + if (length > int.MaxValue) + { + throw new InvalidDataException("Message too large."); + } + + return (int)length; + } + private static async Task ReadMessageContent(Stream responseStream, Memory messageData, int length, CancellationToken cancellationToken) { // Read message content until content length is reached diff --git a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs index 3d32cd5fc..de01d6c99 100644 --- a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs +++ b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs @@ -20,6 +20,7 @@ using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Greet; using Grpc.Core; using Grpc.Tests.Shared; using NUnit.Framework; @@ -42,16 +43,6 @@ public void Build_SslCredentialsWithHttps_Success() Assert.IsTrue(channel.IsSecure); } - [Test] - public void Build_NoHttpClient_InternalHttpClientHasInfiniteTimeout() - { - // Arrange & Act - var channel = GrpcChannel.ForAddress("https://localhost"); - - // Assert - Assert.AreEqual(Timeout.InfiniteTimeSpan, channel.HttpClient.Timeout); - } - [Test] public void Build_SslCredentialsWithHttp_ThrowsError() { @@ -107,6 +98,20 @@ public void Build_InsecureCredentialsWithHttps_ThrowsError() Assert.AreEqual("Channel is configured with insecure channel credentials and can't use a HttpClient with a 'https' scheme.", ex.Message); } + [Test] + public void Build_HttpClientAndHttpHandler_ThrowsError() + { + // Arrange & Act + var ex = Assert.Throws(() => GrpcChannel.ForAddress("https://localhost", new GrpcChannelOptions + { + HttpClient = new HttpClient(), + HttpHandler = new HttpClientHandler() + })); + + // Assert + Assert.AreEqual("HttpClient and HttpHandler have been configured. Only one HTTP caller can be specified.", ex.Message); + } + [Test] public void Dispose_NotCalled_NotDisposed() { @@ -130,7 +135,7 @@ public void Dispose_Called_Disposed() // Assert Assert.IsTrue(channel.Disposed); - Assert.Throws(() => channel.HttpClient.CancelPendingRequests()); + Assert.Throws(() => channel.HttpInvoker.SendAsync(new HttpRequestMessage(), CancellationToken.None)); } [Test] @@ -174,21 +179,6 @@ public async Task Dispose_StartCallOnClient_ThrowError() await ExceptionAssert.ThrowsAsync(() => client.SayHelloAsync(new Greet.HelloRequest()).ResponseAsync); } - public class TestHttpMessageHandler : HttpMessageHandler - { - public bool Disposed { get; private set; } - - protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } - - protected override void Dispose(bool disposing) - { - Disposed = true; - } - } - [Test] public void Dispose_CalledWhenHttpClientSpecified_HttpClientNotDisposed() { @@ -227,5 +217,68 @@ public void Dispose_CalledWhenHttpClientSpecifiedAndHttpClientDisposedTrue_HttpC Assert.IsTrue(channel.Disposed); Assert.IsTrue(handler.Disposed); } + + [Test] + public void Dispose_CalledWhenHttpMessageHandlerSpecifiedAndHttpClientDisposedTrue_HttpClientDisposed() + { + // Arrange + var handler = new TestHttpMessageHandler(); + var channel = GrpcChannel.ForAddress("https://localhost", new GrpcChannelOptions + { + HttpHandler = handler, + DisposeHttpClient = true + }); + + // Act + channel.Dispose(); + + // Assert + Assert.IsTrue(channel.Disposed); + Assert.IsTrue(handler.Disposed); + } + + [Test] + public async Task Dispose_CalledWhileActiveCalls_ActiveCallsDisposed() + { + // Arrange + var handler = new TestHttpMessageHandler(); + var channel = GrpcChannel.ForAddress("https://localhost", new GrpcChannelOptions + { + HttpHandler = handler + }); + + var client = new Greeter.GreeterClient(channel); + var call = client.SayHelloAsync(new HelloRequest()); + + var exTask = ExceptionAssert.ThrowsAsync(() => call.ResponseAsync); + Assert.IsFalse(exTask.IsCompleted); + + // Act + channel.Dispose(); + + // Assert + var ex = await exTask.DefaultTimeout(); + Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode); + Assert.AreEqual("gRPC call disposed.", ex.Status.Detail); + + Assert.IsTrue(channel.Disposed); + } + + public class TestHttpMessageHandler : HttpMessageHandler + { + public bool Disposed { get; private set; } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(); + cancellationToken.Register(s => ((TaskCompletionSource)s!).SetException(new OperationCanceledException()), tcs); + return await tcs.Task; + } + + protected override void Dispose(bool disposing) + { + Disposed = true; + } + } } } diff --git a/test/Grpc.Net.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs b/test/Grpc.Net.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs index 6661a1c45..907b491dd 100644 --- a/test/Grpc.Net.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs +++ b/test/Grpc.Net.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs @@ -56,7 +56,7 @@ public void CreateClient_Default_InternalHttpClientHasInfiniteTimeout() var client = clientFactory.CreateClient(nameof(TestGreeterClient)); // Assert - Assert.AreEqual(Timeout.InfiniteTimeSpan, client.CallInvoker.Channel.HttpClient.Timeout); + Assert.AreEqual(Timeout.InfiniteTimeSpan, ((HttpClient)client.CallInvoker.Channel.HttpInvoker).Timeout); } [Test] From da88cde806488f61ef532c228500b95524f3e8ec Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 5 May 2020 13:45:57 +1200 Subject: [PATCH 2/3] Remove completed calls from channel's active calls collection --- .../Client/UnaryClientBenchmarkBase.cs | 1 - src/Grpc.Net.Client/GrpcChannel.cs | 15 ++-- src/Grpc.Net.Client/Internal/GrpcCall.cs | 2 + .../Client/CancellationTests.cs | 69 +++++++++++++++++++ test/FunctionalTests/FunctionalTestBase.cs | 18 +++-- .../Infrastructure/GrpcTestFixture.cs | 13 +++- .../Infrastructure/LogRecord.cs | 2 + .../Grpc.Net.Client.Tests/GrpcChannelTests.cs | 2 + testassets/Shared/InteropClient.cs | 4 +- 9 files changed, 111 insertions(+), 15 deletions(-) diff --git a/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs b/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs index bbd40271a..342312eae 100644 --- a/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs +++ b/perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs @@ -23,7 +23,6 @@ using System.Net; using System.Net.Http; using System.Net.Http.Headers; -using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Greet; diff --git a/src/Grpc.Net.Client/GrpcChannel.cs b/src/Grpc.Net.Client/GrpcChannel.cs index e00b9c433..03f4eb0bf 100644 --- a/src/Grpc.Net.Client/GrpcChannel.cs +++ b/src/Grpc.Net.Client/GrpcChannel.cs @@ -19,6 +19,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Net.Http; using System.Threading; using Grpc.Core; @@ -65,7 +66,7 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr { _methodInfoCache = new ConcurrentDictionary(); - // Dispose the HttpClient if... + // Dispose the HTTP client/handler if... // 1. No client/handler was specified and so the channel created the client itself // 2. User has specified a client/handler and set DisposeHttpClient to true _shouldDisposeHttpClient = (channelOptions.HttpClient == null && channelOptions.HttpHandler == null) @@ -295,10 +296,16 @@ public void Dispose() lock (ActiveCalls) { - // Disposing calls will remove them from ActiveCalls - foreach (var activeCall in ActiveCalls) + if (ActiveCalls.Count > 0) { - activeCall.Dispose(); + // Disposing a call will remove it from ActiveCalls. Need to take a copy + // to avoid enumeration from being modified + var activeCallsCopy = ActiveCalls.ToArray(); + + foreach (var activeCall in activeCallsCopy) + { + activeCall.Dispose(); + } } } diff --git a/src/Grpc.Net.Client/Internal/GrpcCall.cs b/src/Grpc.Net.Client/Internal/GrpcCall.cs index 1f82e1a38..e28dbd621 100644 --- a/src/Grpc.Net.Client/Internal/GrpcCall.cs +++ b/src/Grpc.Net.Client/Internal/GrpcCall.cs @@ -170,6 +170,8 @@ private void Cleanup(Status status) ClientStreamReader?.HttpResponseTcs.TrySetCanceled(); } + Channel.FinishActiveCall(this); + _ctsRegistration?.Dispose(); _deadlineTimer?.Dispose(); HttpResponse?.Dispose(); diff --git a/test/FunctionalTests/Client/CancellationTests.cs b/test/FunctionalTests/Client/CancellationTests.cs index f829547dd..40bac510a 100644 --- a/test/FunctionalTests/Client/CancellationTests.cs +++ b/test/FunctionalTests/Client/CancellationTests.cs @@ -190,6 +190,75 @@ await TestHelpers.AssertIsTrueRetryAsync( "Missing client cancellation log.").DefaultTimeout(); } + [Test] + public async Task ServerStreaming_ChannelDisposed_CancellationSentToServer() + { + var syncPoint = new SyncPoint(); + var serverCompleteTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + async Task ServerStreamingCall(DataMessage request, IServerStreamWriter streamWriter, ServerCallContext context) + { + await syncPoint.WaitToContinue().DefaultTimeout(); + + // Wait until the client cancels + while (!context.CancellationToken.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMilliseconds(10)); + } + + serverCompleteTcs.TrySetResult(null); + } + + // Arrange + SetExpectedErrorsFilter(writeContext => + { + // Kestrel cancellation error message + if (writeContext.Exception is IOException && + writeContext.Exception.Message == "The client reset the request stream.") + { + return true; + } + + if (writeContext.LoggerName == "Grpc.Net.Client.Internal.GrpcCall" && + writeContext.EventId.Name == "ErrorStartingCall" && + writeContext.Message == "Error starting gRPC call.") + { + return true; + } + + // Ignore all logging related errors for now + return false; + }); + + var method = Fixture.DynamicGrpc.AddServerStreamingMethod(ServerStreamingCall); + + var channel = CreateChannel(useHandler: true); + + var client = TestClientFactory.Create(channel, method); + + // Act + var call = client.ServerStreamingCall(new DataMessage()); + await syncPoint.WaitForSyncPoint(); + syncPoint.Continue(); + + // Assert + Assert.AreEqual(1, channel.ActiveCalls.Count); + var moveNextTask = call.ResponseStream.MoveNext(CancellationToken.None); + + channel.Dispose(); + + Assert.AreEqual(0, channel.ActiveCalls.Count); + + var ex = await ExceptionAssert.ThrowsAsync(() => moveNextTask).DefaultTimeout(); + Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode); + + await serverCompleteTcs.Task.DefaultTimeout(); + + await TestHelpers.AssertIsTrueRetryAsync( + () => HasLog(LogLevel.Information, "GrpcStatusError", "Call failed with gRPC error status. Status code: 'Cancelled', Message: 'gRPC call disposed.'."), + "Missing client cancellation log.").DefaultTimeout(); + } + [Test] public async Task ServerStreaming_CancellationOnClientAfterResponseHeadersReceived_CancellationSentToServer() { diff --git a/test/FunctionalTests/FunctionalTestBase.cs b/test/FunctionalTests/FunctionalTestBase.cs index 957b5024d..4ad7c2f54 100644 --- a/test/FunctionalTests/FunctionalTestBase.cs +++ b/test/FunctionalTests/FunctionalTestBase.cs @@ -41,13 +41,21 @@ public class FunctionalTestBase protected GrpcChannel Channel => _channel ??= CreateChannel(); - protected GrpcChannel CreateChannel() + protected GrpcChannel CreateChannel(bool useHandler = false) { - return GrpcChannel.ForAddress(Fixture.Client.BaseAddress, new GrpcChannelOptions + var options = new GrpcChannelOptions { - LoggerFactory = LoggerFactory, - HttpClient = Fixture.Client - }); + LoggerFactory = LoggerFactory + }; + if (useHandler) + { + options.HttpHandler = Fixture.Handler; + } + else + { + options.HttpClient = Fixture.Client; + } + return GrpcChannel.ForAddress(Fixture.Client.BaseAddress, options); } protected virtual void ConfigureServices(IServiceCollection services) { } diff --git a/test/FunctionalTests/Infrastructure/GrpcTestFixture.cs b/test/FunctionalTests/Infrastructure/GrpcTestFixture.cs index e988bf257..b6748f44c 100644 --- a/test/FunctionalTests/Infrastructure/GrpcTestFixture.cs +++ b/test/FunctionalTests/Infrastructure/GrpcTestFixture.cs @@ -52,15 +52,21 @@ public GrpcTestFixture(Action? initialConfigureServices = nu AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - Client = CreateClient(); + (Client, Handler) = CreateHttpCore(); } public ILoggerFactory LoggerFactory { get; } public DynamicGrpcServiceRegistry DynamicGrpc { get; } + public HttpMessageHandler Handler { get; } public HttpClient Client { get; } public HttpClient CreateClient(TestServerEndpointName? endpointName = null, DelegatingHandler? messageHandler = null) + { + return CreateHttpCore(endpointName, messageHandler).client; + } + + private (HttpClient client, HttpMessageHandler handler) CreateHttpCore(TestServerEndpointName? endpointName = null, DelegatingHandler? messageHandler = null) { endpointName ??= TestServerEndpointName.Http2; @@ -68,13 +74,16 @@ public HttpClient CreateClient(TestServerEndpointName? endpointName = null, Dele httpClientHandler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator; HttpClient client; + HttpMessageHandler handler; if (messageHandler != null) { messageHandler.InnerHandler = httpClientHandler; + handler = messageHandler; client = new HttpClient(messageHandler); } else { + handler = httpClientHandler; client = new HttpClient(httpClientHandler); } @@ -94,7 +103,7 @@ public HttpClient CreateClient(TestServerEndpointName? endpointName = null, Dele throw new ArgumentException("Unexpected value: " + endpointName, nameof(endpointName)); } - return client; + return (client, handler); } internal event Action ServerLogged diff --git a/test/FunctionalTests/Infrastructure/LogRecord.cs b/test/FunctionalTests/Infrastructure/LogRecord.cs index bd94b1f89..579ee014b 100644 --- a/test/FunctionalTests/Infrastructure/LogRecord.cs +++ b/test/FunctionalTests/Infrastructure/LogRecord.cs @@ -17,10 +17,12 @@ #endregion using System; +using System.Diagnostics; using Microsoft.Extensions.Logging; namespace Grpc.AspNetCore.FunctionalTests.Infrastructure { + [DebuggerDisplay("{Message,nq}")] public class LogRecord { public LogRecord(DateTime timestamp, LogLevel logLevel, EventId eventId, object state, Exception? exception, Func formatter, string loggerName) diff --git a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs index de01d6c99..977513998 100644 --- a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs +++ b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs @@ -252,6 +252,7 @@ public async Task Dispose_CalledWhileActiveCalls_ActiveCallsDisposed() var exTask = ExceptionAssert.ThrowsAsync(() => call.ResponseAsync); Assert.IsFalse(exTask.IsCompleted); + Assert.AreEqual(1, channel.ActiveCalls.Count); // Act channel.Dispose(); @@ -262,6 +263,7 @@ public async Task Dispose_CalledWhileActiveCalls_ActiveCallsDisposed() Assert.AreEqual("gRPC call disposed.", ex.Status.Detail); Assert.IsTrue(channel.Disposed); + Assert.AreEqual(0, channel.ActiveCalls.Count); } public class TestHttpMessageHandler : HttpMessageHandler diff --git a/testassets/Shared/InteropClient.cs b/testassets/Shared/InteropClient.cs index ba4acab00..129e9ac3e 100644 --- a/testassets/Shared/InteropClient.cs +++ b/testassets/Shared/InteropClient.cs @@ -196,12 +196,10 @@ private async Task HttpClientCreateChannel() httpMessageHandler = httpClientHandler; } - var httpClient = new HttpClient(httpMessageHandler); - var channel = GrpcChannel.ForAddress($"{scheme}://{options.ServerHost}:{options.ServerPort}", new GrpcChannelOptions { Credentials = credentials, - HttpClient = httpClient, + HttpHandler = httpMessageHandler, LoggerFactory = loggerFactory }); From b0575a687ead742a77f9fd5f8b8e36526c24be5c Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 5 May 2020 20:47:32 +1200 Subject: [PATCH 3/3] PR feedback --- .../Internal/StreamExtensions.cs | 40 +------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/src/Grpc.Net.Client/Internal/StreamExtensions.cs b/src/Grpc.Net.Client/Internal/StreamExtensions.cs index ac6ed2aee..4f45c8fc8 100644 --- a/src/Grpc.Net.Client/Internal/StreamExtensions.cs +++ b/src/Grpc.Net.Client/Internal/StreamExtensions.cs @@ -46,44 +46,6 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport return new Status(StatusCode.Unimplemented, $"Unsupported grpc-encoding value '{unsupportedEncoding}'. Supported encodings: {string.Join(", ", supportedEncodings)}"); } - private static async Task<(int length, bool compressed)?> ReadHeaderAsync(Stream responseStream, Memory header, CancellationToken cancellationToken) - { - int read; - var received = 0; - while ((read = await responseStream.ReadAsync(header.Slice(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0) - { - received += read; - - if (received == GrpcProtocolConstants.HeaderSize) - { - break; - } - } - - if (received < GrpcProtocolConstants.HeaderSize) - { - if (received == 0) - { - return null; - } - - throw new InvalidDataException("Unexpected end of content while reading the message header."); - } - - // Read the header first - // - 1 byte flag for compression - // - 4 bytes for the content length - var compressed = ReadCompressedFlag(header.Span[0]); - var length = BinaryPrimitives.ReadUInt32BigEndian(header.Span.Slice(1, 4)); - - if (length > int.MaxValue) - { - throw new InvalidDataException("Message too large."); - } - - return ((int)length, compressed); - } - public static async ValueTask ReadMessageAsync( this Stream responseStream, ILogger logger, @@ -287,7 +249,7 @@ private static bool ReadCompressedFlag(byte flag) } } - // TODO(JamesNK): Reuse serialization content between message writes. Improve client/duplex streaming allocations. + // TODO(JamesNK): Reuse serialization context between message writes. Improve client/duplex streaming allocations. public static async ValueTask WriteMessageAsync( this Stream stream, ILogger logger,