Skip to content

Commit

Permalink
Tries triggering the race condition as described in grpc#2119
Browse files Browse the repository at this point in the history
  • Loading branch information
amanda-tarafa committed May 12, 2023
1 parent 38738af commit e21fed5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/Grpc.Net.Client/GrpcChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,16 @@ internal void RegisterActiveCall(IDisposable grpcCall)

internal void FinishActiveCall(IDisposable grpcCall)
{
Logger.LogDebug($"Finishing active call by thread: {Environment.CurrentManagedThreadId}");
lock (_lock)
{
// Maybe's here refer to the fact that if this is executed in the thread of channel.Dispose
// then the lock is neither acquired nor released as it's already beening hold by the Dispose
// thread, which will release it when it's done.
Logger.LogDebug($"Lock maybe acquired by thread: {Environment.CurrentManagedThreadId}");
ActiveCalls.Remove(grpcCall);
}
Logger.LogDebug($"Lock maybe released by thread: {Environment.CurrentManagedThreadId}");
}

internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method)
Expand Down Expand Up @@ -733,13 +739,16 @@ public Task WaitForStateChangedAsync(ConnectivityState lastObservedState, Cancel
/// </summary>
public void Dispose()
{
Logger.LogDebug($"Channel disposing by thread: {Environment.CurrentManagedThreadId}");

if (Disposed)
{
return;
}

lock (_lock)
{
Logger.LogDebug($"Lock acquired in dispose by thread {Environment.CurrentManagedThreadId}");
if (ActiveCalls.Count > 0)
{
// Disposing a call will remove it from ActiveCalls. Need to take a copy
Expand All @@ -752,6 +761,7 @@ public void Dispose()
}
}
}
Logger.LogDebug($"Lock released in disposed by thread {Environment.CurrentManagedThreadId}");

if (_shouldDisposeHttpClient)
{
Expand Down
8 changes: 8 additions & 0 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,19 @@ private void CancelCallFromCancellationToken(CancellationToken cancellationToken
{
using (StartScope())
{
Logger.LogDebug($"Cancel from cancellation token in thread: {Environment.CurrentManagedThreadId}");
CancelCall(GrpcProtocolConstants.CreateClientCanceledStatus(new OperationCanceledException(cancellationToken)));
}
}

private void CancelCall(Status status)
{
Logger.LogDebug($"Before try set result in thread: {Environment.CurrentManagedThreadId}");
// Set overall call status first. Status can be used in throw RpcException from cancellation.
// If response has successfully finished then the status will come from the trailers.
// If it didn't finish then complete with a status.
_callTcs.TrySetResult(status);
Logger.LogDebug($"After try set result in thread: {Environment.CurrentManagedThreadId}");

// Checking if cancellation has already happened isn't threadsafe
// but there is no adverse effect other than an extra log message
Expand Down Expand Up @@ -627,6 +630,11 @@ private async Task RunCall(HttpRequestMessage request, TimeSpan? timeout)
}
catch (Exception ex)
{
Logger.LogDebug($"Executing cancellation continuations in thread: {Environment.CurrentManagedThreadId}");
// Let's simulate we loose CPU here, and hopefully Dispose comes in and has to block on continuation callbacks beeing done.
Thread.Sleep(100);
Logger.LogDebug($"Coming back to cancellation continuations in thread: {Environment.CurrentManagedThreadId}");

ResolveException(ErrorStartingCallMessage, ex, out status, out var resolvedException);

finished = FinishCall(request, diagnosticSourceEnabled, activity, status.Value);
Expand Down
60 changes: 60 additions & 0 deletions test/Grpc.Net.Client.Tests/AsyncDuplexStreamingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Shared;
using Grpc.Tests.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace Grpc.Net.Client.Tests;
Expand Down Expand Up @@ -177,4 +179,62 @@ await streamContent.AddDataAndWait(await ClientTestHelpers.GetResponseDataAsync(
Assert.IsTrue(moveNextTask4.IsCompleted);
Assert.IsFalse(await moveNextTask3.DefaultTimeout());
}

[Test]
public async Task AsyncDuplexStreamingCall_CancellationDisposeRace_Success()
{
// Arrange
var services = new ServiceCollection();
services.AddNUnitLogger();
var loggerFactory = services.BuildServiceProvider().GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger(GetType());

for (int j = 0; j < 2000; j++)
{
logger.LogDebug("---------------------------------------------------------------");
var tcs = new TaskCompletionSource<HttpResponseMessage>();
var handler = TestHttpMessageHandler.Create(request =>
{
return tcs.Task;
});
var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions
{
HttpHandler = handler,
LoggerFactory = loggerFactory
});
var invoker = channel.CreateCallInvoker();
var actTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var tasks = new List<Task>();
for (var i = 0; i < 1; i++)
{
var cts = new CancellationTokenSource();
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: cts.Token));
tasks.Add(Task.Run(async () =>
{
await actTcs.Task;
cts.Cancel();
}));
}

tasks.Add(Task.Run(async () =>
{
await actTcs.Task;
// Trying to get dispose to trigger in between all the cancellations.
await Task.Delay(7);
channel.Dispose();
}));

// Small pause to make sure we're waiting at the TCS everywhere.
await Task.Delay(50);

// Act
actTcs.SetResult(true);

// Assert
await Task.WhenAll(tasks).DefaultTimeout();
// So that we see all logs from a single run together.
await Task.Delay(1000);
}
}
}

0 comments on commit e21fed5

Please sign in to comment.