Skip to content

Commit

Permalink
Reproduces the deadlock by the race condition described in grpc#2119
Browse files Browse the repository at this point in the history
  • Loading branch information
amanda-tarafa committed May 12, 2023
1 parent 38738af commit 3a08267
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions test/Grpc.Net.Client.Tests/AsyncDuplexStreamingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Shared;
using Grpc.Tests.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace Grpc.Net.Client.Tests;
Expand Down Expand Up @@ -177,4 +179,82 @@ await streamContent.AddDataAndWait(await ClientTestHelpers.GetResponseDataAsync(
Assert.IsTrue(moveNextTask4.IsCompleted);
Assert.IsFalse(await moveNextTask3.DefaultTimeout());
}

[Test]
public async Task AsyncDuplexStreamingCall_CancellationDisposeRace_Success()
{
// Arrange
var services = new ServiceCollection();
services.AddNUnitLogger();
var loggerFactory = services.BuildServiceProvider().GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger(GetType());

for (int i = 0; i < 20; i++)
{
// Let's mimic a real call first to get GrpcCall.RunCall where we need to for reproducing the deadlock.
var streamContent = new SyncPointMemoryStream();
var requestContentTcs = new TaskCompletionSource<Task<Stream>>(TaskCreationOptions.RunContinuationsAsynchronously);

PushStreamContent<HelloRequest, HelloReply>? content = null;

var handler = TestHttpMessageHandler.Create(async request =>
{
content = (PushStreamContent<HelloRequest, HelloReply>)request.Content!;
var streamTask = content.ReadAsStreamAsync();
requestContentTcs.SetResult(streamTask);
// Wait for RequestStream.CompleteAsync()
await streamTask;
return ResponseUtils.CreateResponse(HttpStatusCode.OK, new StreamContent(streamContent));
});
var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions
{
HttpHandler = handler,
LoggerFactory = loggerFactory
});
var invoker = channel.CreateCallInvoker();

var cts = new CancellationTokenSource();

var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: cts.Token));
await call.RequestStream.WriteAsync(new HelloRequest { Name = "1" }).DefaultTimeout();
await call.RequestStream.CompleteAsync().DefaultTimeout();

// Let's read a response
var deserializationContext = new DefaultDeserializationContext();
var requestContent = await await requestContentTcs.Task.DefaultTimeout();
var requestMessage = await StreamSerializationHelper.ReadMessageAsync(
requestContent,
ClientTestHelpers.ServiceMethod.RequestMarshaller.ContextualDeserializer,
GrpcProtocolConstants.IdentityGrpcEncoding,
maximumMessageSize: null,
GrpcProtocolConstants.DefaultCompressionProviders,
singleMessage: false,
CancellationToken.None).DefaultTimeout();
Assert.AreEqual("1", requestMessage!.Name);

var actTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var cancellationTask = Task.Run(async () =>
{
await actTcs.Task;
cts.Cancel();
});
var disposingTask = Task.Run(async () =>
{
await actTcs.Task;
channel.Dispose();
});

// Small pause to make sure we're waiting at the TCS everywhere.
await Task.Delay(50);

// Act
actTcs.SetResult(true);

// Assert
await Task.WhenAll(cancellationTask, disposingTask).DefaultTimeout();
}
}
}

0 comments on commit 3a08267

Please sign in to comment.