Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient RandomAccess async I/O on the thread pool. #55123

Merged
merged 10 commits into from
Jul 4, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,34 @@ namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ValueTaskSource? _reusableValueTaskSource; // reusable ValueTaskSource that is currently NOT being used
private OverlappedValueTaskSource? _reusableOverlappedValueTaskSource; // reusable OverlappedValueTaskSource that is currently NOT being used

// Rent the reusable ValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the FileStream is being used concurrently).
internal ValueTaskSource GetValueTaskSource() => Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
// Rent the reusable OverlappedValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal OverlappedValueTaskSource GetOverlappedValueTaskSource() =>
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null) ?? new OverlappedValueTaskSource(this);

protected override bool ReleaseHandle()
{
bool result = Interop.Kernel32.CloseHandle(handle);

Interlocked.Exchange(ref _reusableValueTaskSource, null)?.Dispose();
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null)?.Dispose();

return result;
}

private void TryToReuse(ValueTaskSource source)
private void TryToReuse(OverlappedValueTaskSource source)
{
source._source.Reset();

if (Interlocked.CompareExchange(ref _reusableValueTaskSource, source, null) is not null)
if (Interlocked.CompareExchange(ref _reusableOverlappedValueTaskSource, source, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}

/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
/// <summary>Reusable IValueTaskSource for RandomAccess async operations based on Overlapped I/O.</summary>
internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

Expand All @@ -55,7 +56,7 @@ internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTask
/// </summary>
internal ulong _result;

internal ValueTaskSource(SafeFileHandle fileHandle)
internal OverlappedValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
_source.RunContinuationsAsynchronously = true;
Expand Down Expand Up @@ -112,7 +113,7 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
{
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
{
ValueTaskSource vts = (ValueTaskSource)s!;
OverlappedValueTaskSource vts = (OverlappedValueTaskSource)s!;
if (!vts._fileHandle.IsInvalid)
{
try
Expand Down Expand Up @@ -156,7 +157,7 @@ internal void ReleaseResources()
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
// responsible for calling Complete and for passing the necessary data between parties.

/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
/// <summary>Invoked when the async operation finished being scheduled.</summary>
internal void FinishedScheduling()
{
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
Expand All @@ -172,7 +173,7 @@ internal void FinishedScheduling()
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
OverlappedValueTaskSource? vts = (OverlappedValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(vts is not null);
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Internal.Runtime.CompilerServices;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ThreadPoolValueTaskSource? _reusableThreadPoolValueTaskSource; // reusable ThreadPoolValueTaskSource that is currently NOT being used

// Rent the reusable ThreadPoolValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal ThreadPoolValueTaskSource GetThreadPoolValueTaskSource() =>
Interlocked.Exchange(ref _reusableThreadPoolValueTaskSource, null) ?? new ThreadPoolValueTaskSource(this);

private void TryToReuse(ThreadPoolValueTaskSource source)
{
Interlocked.CompareExchange(ref _reusableThreadPoolValueTaskSource, source, null);
}

/// <summary>
/// A reusable <see cref="IValueTaskSource"/> implementation that
/// queues asynchronous <see cref="RandomAccess"/> operations to
/// be completed synchronously on the thread pool.
/// </summary>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>
{
private enum Operation : byte
{
None,
Read,
Write,
ReadScatter,
WriteGather
}
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

private ManualResetValueTaskSourceCore<long> _source;
private readonly SafeFileHandle _fileHandle;
private Operation _operation = Operation.None;

// These fields store the parameters for the operation.
// The first two are common for all kinds of operations.
private long _fileOffset;
private CancellationToken _cancellationToken;
// Used by simple reads and writes. Will be unsafely cast to a memory when performing a read.
private ReadOnlyMemory<byte> _singleSegment;
// Used by vectored reads and writes. Is an IReadOnlyList of either Memory or ReadOnlyMemory of bytes.
private object? _multiSegmentCollection;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

internal ThreadPoolValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
_source.RunContinuationsAsynchronously = true;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
}

private long GetResultAndRelease(short token)
{
try
{
return _source.GetResult(token);
} finally
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
{
_source.Reset();
_fileHandle.TryToReuse(this);
}
}

public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_source.OnCompleted(continuation, state, token, flags);
int IValueTaskSource<int>.GetResult(short token) => (int) GetResultAndRelease(token);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
long IValueTaskSource<long>.GetResult(short token) => GetResultAndRelease(token);

void IThreadPoolWorkItem.Execute()
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
{
Debug.Assert(_operation >= Operation.Read && _operation <= Operation.WriteGather);

long result = 0;
try {
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
bool notifyWhenUnblocked = false;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
try
{
// This is the operation's last chance to be cancelled.
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
if (_cancellationToken.IsCancellationRequested)
{
_source.SetException(new OperationCanceledException(_cancellationToken));
return;
}

notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
switch (_operation)
{
case Operation.Read:
Memory<byte> writableSingleSegment = Unsafe.As<ReadOnlyMemory<byte>, Memory<byte>>(ref _singleSegment);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.ReadAtOffset(_fileHandle, writableSingleSegment.Span, _fileOffset);
break;
case Operation.Write:
result = RandomAccess.WriteAtOffset(_fileHandle, _singleSegment.Span, _fileOffset);
break;
case Operation.ReadScatter:
Debug.Assert(_multiSegmentCollection != null && _multiSegmentCollection is IReadOnlyList<Memory<byte>>);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.ReadScatterAtOffset(_fileHandle, (IReadOnlyList<Memory<byte>>) _multiSegmentCollection, _fileOffset);
break;
case Operation.WriteGather:
Debug.Assert(_multiSegmentCollection != null && _multiSegmentCollection is IReadOnlyList<ReadOnlyMemory<byte>>);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.WriteGatherAtOffset(_fileHandle, (IReadOnlyList<ReadOnlyMemory<byte>>)_multiSegmentCollection, _fileOffset);
break;
}
} finally
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
{
if (notifyWhenUnblocked)
ThreadPool.NotifyThreadUnblocked();
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
_operation = Operation.None;
_fileOffset = 0;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
_cancellationToken = default;
_singleSegment = default;
_multiSegmentCollection = null;
}
} catch (Exception e)
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
{
_source.SetException(e);
}
_source.SetResult(result);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
}

public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

_operation = Operation.Read;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<int> QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.Write;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<long> QueueReadScatter(IReadOnlyList<Memory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.ReadScatter;
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);

return new ValueTask<long>(this, _source.Version);
}

public ValueTask<long> QueueWriteGather(IReadOnlyList<ReadOnlyMemory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.WriteGather;
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);

return new ValueTask<long>(this, _source.Version);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Microsoft.Win32.SafeHandles
{
public sealed class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
// not using bool? as it's not thread safe
private volatile NullableBool _canSeek = NullableBool.Undefined;
Expand All @@ -23,11 +23,6 @@ private SafeFileHandle(bool ownsHandle)
SetHandle(new IntPtr(-1));
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : this(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync { get; private set; }

internal bool CanSeek => !IsClosed && GetCanSeek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ public SafeFileHandle() : base(true)
{
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync => (GetFileOptions() & FileOptions.Asynchronous) != 0;

internal bool CanSeek => !IsClosed && GetFileType() == Interop.Kernel32.FileTypes.FILE_TYPE_DISK;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\CriticalHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ThreadPoolValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeWaitHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\AccessViolationException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Action.cs" />
Expand Down Expand Up @@ -1517,7 +1519,7 @@
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetModuleFileName.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetModuleFileName.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetProcessMemoryInfo.cs">
Expand Down Expand Up @@ -1775,7 +1777,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Internal\Console.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Internal\Win32\RegistryKey.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.OverlappedValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFindHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.Windows.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal static long GetFileLength(SafeFileHandle handle, string? path)
return status.Size;
}

private static unsafe int ReadAtOffset(SafeFileHandle handle, Span<byte> buffer, long fileOffset)
internal static unsafe int ReadAtOffset(SafeFileHandle handle, Span<byte> buffer, long fileOffset)
{
fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
{
Expand All @@ -34,7 +34,7 @@ private static unsafe int ReadAtOffset(SafeFileHandle handle, Span<byte> buffer,
}
}

private static unsafe long ReadScatterAtOffset(SafeFileHandle handle, IReadOnlyList<Memory<byte>> buffers, long fileOffset)
internal static unsafe long ReadScatterAtOffset(SafeFileHandle handle, IReadOnlyList<Memory<byte>> buffers, long fileOffset)
{
MemoryHandle[] handles = new MemoryHandle[buffers.Count];
Span<Interop.Sys.IOVector> vectors = buffers.Count <= IovStackThreshold ? stackalloc Interop.Sys.IOVector[IovStackThreshold] : new Interop.Sys.IOVector[buffers.Count];
Expand Down Expand Up @@ -74,7 +74,7 @@ private static ValueTask<long> ReadScatterAtOffsetAsync(SafeFileHandle handle, I
long fileOffset, CancellationToken cancellationToken)
=> ScheduleSyncReadScatterAtOffsetAsync(handle, buffers, fileOffset, cancellationToken);

private static unsafe int WriteAtOffset(SafeFileHandle handle, ReadOnlySpan<byte> buffer, long fileOffset)
internal static unsafe int WriteAtOffset(SafeFileHandle handle, ReadOnlySpan<byte> buffer, long fileOffset)
{
fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
{
Expand All @@ -84,7 +84,7 @@ private static unsafe int WriteAtOffset(SafeFileHandle handle, ReadOnlySpan<byte
}
}

private static unsafe long WriteGatherAtOffset(SafeFileHandle handle, IReadOnlyList<ReadOnlyMemory<byte>> buffers, long fileOffset)
internal static unsafe long WriteGatherAtOffset(SafeFileHandle handle, IReadOnlyList<ReadOnlyMemory<byte>> buffers, long fileOffset)
{
MemoryHandle[] handles = new MemoryHandle[buffers.Count];
Span<Interop.Sys.IOVector> vectors = buffers.Count <= IovStackThreshold ? stackalloc Interop.Sys.IOVector[IovStackThreshold] : new Interop.Sys.IOVector[buffers.Count ];
Expand Down
Loading