diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 381db79cac..196067b5c2 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -344,6 +344,7 @@ + @@ -393,6 +394,8 @@ + + @@ -522,6 +525,7 @@ Common\CoreLib\System\Threading\Tasks\TaskToApm.cs + @@ -533,8 +537,8 @@ - + @@ -826,6 +830,7 @@ + True True diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs new file mode 100644 index 0000000000..46d3b70a25 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs @@ -0,0 +1,60 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + /// + /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. + /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. + /// + internal sealed partial class ConcurrentQueueSemaphore + { + private readonly SemaphoreSlim _semaphore; + private readonly ConcurrentQueue> _queue; + + public ConcurrentQueueSemaphore(int initialCount) + { + _semaphore = new SemaphoreSlim(initialCount); + _queue = new ConcurrentQueue>(); + } + + public Task WaitAsync(CancellationToken cancellationToken) + { + // try sync wait with 0 which will not block to see if we need to do an async wait + if (_semaphore.Wait(0, cancellationToken)) + { + return Task.CompletedTask; + } + else + { + var tcs = new TaskCompletionSource(); + _queue.Enqueue(tcs); + _semaphore.WaitAsync().ContinueWith( + continuationAction: static (Task task, object state) => + { + ConcurrentQueue> queue = (ConcurrentQueue>)state; + if (queue.TryDequeue(out TaskCompletionSource popped)) + { + popped.SetResult(true); + } + }, + state: _queue, + cancellationToken: cancellationToken + ); + return tcs.Task; + } + } + + public void Release() + { + _semaphore.Release(); + } + } + +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.Task.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.Task.cs new file mode 100644 index 0000000000..74b1206fa5 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.Task.cs @@ -0,0 +1,73 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + // NetCore2.1: + // DO NOT OVERRIDE ValueTask versions of ReadAsync and WriteAsync because the underlying SslStream implements them + // by calling the Task versions which are already overridden meaning that if a caller uses Task WriteAsync this would + // call ValueTask WriteAsync which then called TaskWriteAsync introducing a lock cycle and never return + + internal sealed partial class SNISslStream + { + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } + + internal sealed partial class SNINetworkStream + { + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.ValueTask.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.ValueTask.cs new file mode 100644 index 0000000000..5779304608 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.ValueTask.cs @@ -0,0 +1,89 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; +using System; + +namespace Microsoft.Data.SqlClient.SNI +{ + internal sealed partial class SNISslStream + { + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } + + internal sealed partial class SNINetworkStream + { + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index eb8661d022..389f25eeae 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -4,8 +4,6 @@ using System.Net.Security; using System.IO; -using System.Threading; -using System.Threading.Tasks; using System.Net.Sockets; namespace Microsoft.Data.SqlClient.SNI @@ -13,7 +11,7 @@ namespace Microsoft.Data.SqlClient.SNI /// /// This class extends SslStream to customize stream behavior for Managed SNI implementation. /// - internal class SNISslStream : SslStream + internal sealed partial class SNISslStream : SslStream { private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; @@ -24,40 +22,12 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } - - // Prevent ReadAsync collisions by running the task in a Semaphore Slim - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _readAsyncSemaphore.Release(); - } - } - - // Prevent the WriteAsync collisions by running the task in a Semaphore Slim - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _writeAsyncSemaphore.Release(); - } - } } /// /// This class extends NetworkStream to customize stream behavior for Managed SNI implementation. /// - internal class SNINetworkStream : NetworkStream + internal sealed partial class SNINetworkStream : NetworkStream { private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; @@ -67,33 +37,5 @@ public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocke _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } - - // Prevent ReadAsync collisions by running the task in a Semaphore Slim - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _readAsyncSemaphore.Release(); - } - } - - // Prevent the WriteAsync collisions by running the task in a Semaphore Slim - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _writeAsyncSemaphore.Release(); - } - } } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index 745e820948..a292fa5d56 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2169,56 +2169,4 @@ public static MethodInfo GetPromotedToken } } - /// - /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. - /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. - /// - internal class ConcurrentQueueSemaphore - { - private static readonly Action s_continuePop = ContinuePop; - - private readonly SemaphoreSlim _semaphore; - private readonly ConcurrentQueue> _queue = - new ConcurrentQueue>(); - - public ConcurrentQueueSemaphore(int initialCount) - { - _semaphore = new SemaphoreSlim(initialCount); - } - - public Task WaitAsync(CancellationToken cancellationToken) - { - // try sync wait with 0 which will not block to see if we need to do an async wait - if (_semaphore.Wait(0, cancellationToken)) - { - return Task.CompletedTask; - } - else - { - var tcs = new TaskCompletionSource(); - _queue.Enqueue(tcs); - _semaphore.WaitAsync().ContinueWith( - continuationAction: s_continuePop, - state: _queue, - cancellationToken: cancellationToken - ); - return tcs.Task; - } - } - - public void Release() - { - _semaphore.Release(); - } - - private static void ContinuePop(Task task, object state) - { - ConcurrentQueue> queue = (ConcurrentQueue>)state; - if (queue.TryDequeue(out TaskCompletionSource popped)) - { - popped.SetResult(true); - } - } - } - -}//namespace +}