Skip to content

Commit

Permalink
Split PortableThreadPool.WorkerThread start and loop body (#84490)
Browse files Browse the repository at this point in the history
This is Part 1 of #84489 - landing support for async JS interop on threadpool threads in multi-threaded WebAssembly.

We will need to start the threadpool worker threads on the browser in a special way, such that they can exit back to the JS event loop, and use callbacks to run the worker loop body.

The current PR splits into a separate file the logic for starting threadpool worker threads, and the outer loop that waits for the semaphore that signals that work is available for the worker. The loop body (to be shared with the callback-based approach in a future PR) remains in PortableThreadPool.WorkerThread.cs as several new toplevel functions.

Current PR is just refactoring existing code. No functional change.

* Split PortableThreadPool.WorkerThread start and loop body

   For browser-wasm we will need to start the worker thread in a special way, and use callbacks to run the loop body.

   Current PR is just refactoring existing code. No functional change.

* Change loop to use return instead of break

* rename utility method to ShouldExitWorker
  • Loading branch information
lambdageek committed Apr 18, 2023
1 parent 790b14b commit a4e2609
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2527,6 +2527,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs"/>
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;

namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static partial class WorkerThread
{

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();

PortableThreadPool threadPoolInstance = ThreadPoolInstance;

if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;

while (true)
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
{
WorkerDoWork(threadPoolInstance, ref spinWait);
}

if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock))
{
break;
}
}
}


private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

namespace System.Threading
{
Expand All @@ -28,147 +29,112 @@ private static partial class WorkerThread
// preexisting threads from running out of memory when using new stack space in low-memory situations.
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

PortableThreadPool threadPoolInstance = ThreadPoolInstance;
// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (NativeRuntimeEventSource.Log.IsEnabled())
if (!alreadyRemovedWorkingWorker)
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
}
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;
// returns true if the worker is shutting down
// returns false if we should do another iteration
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool ShouldExitWorker (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock)
{
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
return false;
}

while (true)
threadAdjustmentLock.Acquire();
try
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (!alreadyRemovedWorkingWorker)
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
{
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
// In this case, enough work came in that this thread should not time out and should go back to work.
return false;
}
}

// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
continue;
}

threadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}

ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}
return;
}

counts = oldCounts;
return true;
}

counts = oldCounts;
}
finally
{
threadAdjustmentLock.Release();
}
}
finally
{
threadAdjustmentLock.Release();
}
}

Expand Down Expand Up @@ -300,17 +266,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
}
return false;
}

private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}

0 comments on commit a4e2609

Please sign in to comment.