diff --git a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs index 95b283e707c69..86dd19a5a7560 100644 --- a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs +++ b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs @@ -522,6 +522,15 @@ internal static void NotifyWorkItemProgress() [MethodImpl(MethodImplOptions.InternalCall)] private static extern void NotifyWorkItemProgressNative(); + internal static bool NotifyThreadBlocked() => + UsePortableThreadPool && PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); + + internal static void NotifyThreadUnblocked() + { + Debug.Assert(UsePortableThreadPool); + PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); + } + internal static object? GetOrCreateThreadLocalCompletionCountObject() => UsePortableThreadPool ? PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject() : null; diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems index 22a9b4141336e..8c0515110d95c 100644 --- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems +++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems @@ -2166,9 +2166,10 @@ - + + diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs index 2dcce9f59a073..785000089bdaa 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs @@ -66,7 +66,8 @@ public enum ThreadAdjustmentReasonMap : uint ChangePoint, Stabilizing, Starvation, - ThreadTimedOut + ThreadTimedOut, + CooperativeBlocking, } #if !ES_BUILD_STANDALONE diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs new file mode 100644 index 0000000000000..1ac8ecb60c91d --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -0,0 +1,340 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; + +namespace System.Threading +{ + internal sealed partial class PortableThreadPool + { + public short MinThreadsGoal + { + get + { + _threadAdjustmentLock.VerifyIsLocked(); + return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment); + } + } + + private short TargetThreadsGoalForBlockingAdjustment + { + get + { + _threadAdjustmentLock.VerifyIsLocked(); + + return + _numBlockedThreads <= 0 + ? _minThreads + : (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)_maxThreads); + } + } + + public bool NotifyThreadBlocked() + { + if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread) + { + return false; + } + + bool wakeGateThread = false; + _threadAdjustmentLock.Acquire(); + try + { + _numBlockedThreads++; + Debug.Assert(_numBlockedThreads > 0); + + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary && + _separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment) + { + if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None) + { + wakeGateThread = true; + } + _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary; + } + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (wakeGateThread) + { + GateThread.Wake(this); + } + return true; + } + + public void NotifyThreadUnblocked() + { + Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled); + Debug.Assert(Thread.CurrentThread.IsThreadPoolThread); + + bool wakeGateThread = false; + _threadAdjustmentLock.Acquire(); + try + { + Debug.Assert(_numBlockedThreads > 0); + _numBlockedThreads--; + + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately && + _numThreadsAddedDueToBlocking > 0 && + _separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment) + { + wakeGateThread = true; + _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately; + } + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (wakeGateThread) + { + GateThread.Wake(this); + } + } + + private uint PerformBlockingAdjustment(bool previousDelayElapsed) + { + uint nextDelayMs; + bool addWorker; + _threadAdjustmentLock.Acquire(); + try + { + nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker); + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); + } + return nextDelayMs; + } + + private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker) + { + _threadAdjustmentLock.VerifyIsLocked(); + Debug.Assert(_pendingBlockingAdjustment != PendingBlockingAdjustment.None); + + _pendingBlockingAdjustment = PendingBlockingAdjustment.None; + addWorker = false; + + short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment; + short numThreadsGoal = _separated.numThreadsGoal; + if (numThreadsGoal == targetThreadsGoal) + { + return 0; + } + + if (numThreadsGoal > targetThreadsGoal) + { + // The goal is only decreased by how much it was increased in total due to blocking adjustments. This is to + // allow blocking adjustments to play well with starvation and hill climbing, either of which may increase the + // goal independently for other reasons, and blocking adjustments should not undo those changes. + if (_numThreadsAddedDueToBlocking <= 0) + { + return 0; + } + + short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking); + _numThreadsAddedDueToBlocking -= toSubtract; + _separated.numThreadsGoal = numThreadsGoal -= toSubtract; + HillClimbing.ThreadPoolHillClimber.ForceChange( + numThreadsGoal, + HillClimbing.StateOrTransition.CooperativeBlocking); + return 0; + } + + short configuredMaxThreadsWithoutDelay = + (short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)_maxThreads); + + do + { + // Calculate how many threads can be added without a delay. Threads that were already created but may be just + // waiting for work can be released for work without a delay, but creating a new thread may need a delay. + ThreadCounts counts = _separated.counts; + short maxThreadsGoalWithoutDelay = + Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads)); + short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay); + short newNumThreadsGoal; + if (numThreadsGoal < targetThreadsGoalWithoutDelay) + { + newNumThreadsGoal = targetThreadsGoalWithoutDelay; + } + else if (previousDelayElapsed) + { + newNumThreadsGoal = (short)(numThreadsGoal + 1); + } + else + { + // Need to induce a delay before adding a thread + break; + } + + do + { + if (newNumThreadsGoal <= counts.NumExistingThreads) + { + break; + } + + // + // Threads would likely need to be created to compensate for blocking, so check memory usage and limits + // + + long memoryLimitBytes = _memoryLimitBytes; + if (memoryLimitBytes <= 0) + { + break; + } + + // Memory usage is updated after gen 2 GCs, and roughly represents how much physical memory was in use at + // the time of the last gen 2 GC. When new threads are also blocking, they may not have used their typical + // amount of stack space, and gen 2 GCs may not be happening to update the memory usage. Account for a bit + // of extra stack space usage in the future for each thread. + long memoryUsageBytes = + _memoryUsageBytes + + counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes; + + // The memory limit may already be less than the total amount of physical memory. We are only accounting for + // thread pool worker threads above, and after fallback starvation may have to continue creating threads + // slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit + // some additional buffer. + long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10; + if (memoryUsageBytes >= memoryThresholdForFallbackBytes) + { + return 0; + } + + // Determine how many threads can be added without exceeding the memory threshold + long achievableNumThreadsGoal = + counts.NumExistingThreads + + (memoryThresholdForFallbackBytes - memoryUsageBytes) / + WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes; + newNumThreadsGoal = (short)Math.Min(newNumThreadsGoal, achievableNumThreadsGoal); + if (newNumThreadsGoal <= numThreadsGoal) + { + return 0; + } + } while (false); + + _numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal); + _separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.CooperativeBlocking); + if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0) + { + addWorker = true; + } + + numThreadsGoal = newNumThreadsGoal; + if (numThreadsGoal >= targetThreadsGoal) + { + return 0; + } + } while (false); + + // Calculate how much delay to induce before another thread is created. These operations don't overflow because of + // limits on max thread count and max delays. + _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary; + int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep; + return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayMs); + } + + private enum PendingBlockingAdjustment : byte + { + None, + Immediately, + WithDelayIfNecessary + } + + private static class BlockingConfig + { + public static readonly bool IsCooperativeBlockingEnabled = + AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true); + + public static readonly short ThreadsToAddWithoutDelay; + public static readonly short ThreadsPerDelayStep; + public static readonly uint DelayStepMs; + public static readonly uint MaxDelayMs; + +#pragma warning disable CA1810 // remove the explicit static constructor + static BlockingConfig() + { + // Summary description of how blocking compensation works and how the config settings below are used: + // - After the thread count based on MinThreads is reached, up to ThreadsToAddWithoutDelay additional threads + // may be created without a delay + // - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs + // - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to + // the delay + // - The delay may not exceed MaxDelayMs + // - Delays are only induced before creating threads. If threads are already available, they would be released + // without delay to compensate for cooperative blocking. + // - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode + // where threads would be created if starvation is detected, typically with higher delays + + // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor + // count) specifies how many additional threads may be created without a delay + int blocking_threadsToAddWithoutDelay_procCountFactor = + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.ThreadsToAddWithoutDelay_ProcCountFactor", + 1, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by + // the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay + // before each new thread is created + int blocking_threadsPerDelayStep_procCountFactor = + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.ThreadsPerDelayStep_ProcCountFactor", + 1, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies how much additional + // delay to add per ThreadsPerDelayStep threads, which would be applied before each new thread is created + DelayStepMs = + (uint)AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.DelayStepMs", + 25, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to + // use before each new thread is created + MaxDelayMs = + (uint)AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.MaxDelayMs", + 250, + false); + + int processorCount = Environment.ProcessorCount; + ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor); + if (ThreadsToAddWithoutDelay > MaxPossibleThreadCount || + ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor) + { + ThreadsToAddWithoutDelay = MaxPossibleThreadCount; + } + + blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor); + short maxThreadsPerDelayStep = (short)(MaxPossibleThreadCount - ThreadsToAddWithoutDelay); + ThreadsPerDelayStep = + (short)(processorCount * blocking_threadsPerDelayStep_procCountFactor); + if (ThreadsPerDelayStep > maxThreadsPerDelayStep || + ThreadsPerDelayStep / processorCount != blocking_threadsPerDelayStep_procCountFactor) + { + ThreadsPerDelayStep = maxThreadsPerDelayStep; + } + + MaxDelayMs = Math.Max(1, Math.Min(MaxDelayMs, GateThread.GateActivitiesPeriodMs)); + DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayMs)); + } +#pragma warning restore CA1810 + } + } +} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs index eba5c66d7e0a3..fefe16da418eb 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs @@ -12,14 +12,14 @@ internal sealed partial class PortableThreadPool { private static class GateThread { - private const int GateThreadDelayMs = 500; - private const int DequeueDelayThresholdMs = GateThreadDelayMs * 2; + public const uint GateActivitiesPeriodMs = 500; + private const uint DequeueDelayThresholdMs = GateActivitiesPeriodMs * 2; private const int GateThreadRunningMask = 0x4; - - private static readonly AutoResetEvent s_runGateThreadEvent = new AutoResetEvent(initialState: true); - private const int MaxRuns = 2; + private static readonly AutoResetEvent RunGateThreadEvent = new AutoResetEvent(initialState: true); + private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false); + private static void GateThreadStart() { bool disableStarvationDetection = @@ -33,16 +33,68 @@ private static void GateThreadStart() _ = cpuUtilizationReader.CurrentUtilization; PortableThreadPool threadPoolInstance = ThreadPoolInstance; - LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock; + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; + DelayHelper delayHelper = default; + + if (BlockingConfig.IsCooperativeBlockingEnabled) + { + // Initialize memory usage and limits, and register to update them on gen 2 GCs + threadPoolInstance.OnGen2GCCallback(); + Gen2GcCallback.Register(threadPoolInstance.OnGen2GCCallback); + } while (true) { - s_runGateThreadEvent.WaitOne(); + RunGateThreadEvent.WaitOne(); + int currentTimeMs = Environment.TickCount; + delayHelper.SetGateActivitiesTime(currentTimeMs); - bool needGateThreadForRuntime; - do + while (true) { - Thread.Sleep(GateThreadDelayMs); + bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs)); + currentTimeMs = Environment.TickCount; + + // Thread count adjustment for cooperative blocking + do + { + PendingBlockingAdjustment pendingBlockingAdjustment = threadPoolInstance._pendingBlockingAdjustment; + if (pendingBlockingAdjustment == PendingBlockingAdjustment.None) + { + delayHelper.ClearBlockingAdjustmentDelay(); + break; + } + + bool previousDelayElapsed = false; + if (delayHelper.HasBlockingAdjustmentDelay) + { + previousDelayElapsed = + delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake); + if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary && + !previousDelayElapsed) + { + break; + } + } + + uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed); + if (nextDelayMs <= 0) + { + delayHelper.ClearBlockingAdjustmentDelay(); + } + else + { + delayHelper.SetBlockingAdjustmentTimeAndDelay(currentTimeMs, nextDelayMs); + } + } while (false); + + // + // Periodic gate activities + // + + if (!delayHelper.ShouldPerformGateActivities(currentTimeMs, wasSignaledToWake)) + { + continue; + } if (ThreadPool.EnableWorkerTracking && NativeRuntimeEventSource.Log.IsEnabled()) { @@ -53,17 +105,17 @@ private static void GateThreadStart() int cpuUtilization = cpuUtilizationReader.CurrentUtilization; threadPoolInstance._cpuUtilization = cpuUtilization; - needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization); + bool needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization); if (!disableStarvationDetection && + threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && threadPoolInstance._separated.numRequestedWorkers > 0 && SufficientDelaySinceLastDequeue(threadPoolInstance)) { + bool addWorker = false; + threadAdjustmentLock.Acquire(); try { - hillClimbingThreadAdjustmentLock.Acquire(); - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); - // Don't add a thread if we're at max or if we are already in the process of adding threads. // This logic is slightly different from the native implementation in CoreCLR because there are // no retired threads. In the native implementation, when hill climbing reduces the thread count @@ -73,61 +125,67 @@ private static void GateThreadStart() // stopped from working by hill climbing, so here the number of threads processing work, instead // of the number of existing threads, is compared with the goal. There may be alternative // solutions, for now this is only to maintain consistency in behavior. - while ( - counts.NumExistingThreads < threadPoolInstance._maxThreads && - counts.NumProcessingWork >= counts.NumThreadsGoal) + ThreadCounts counts = threadPoolInstance._separated.counts; + if (counts.NumProcessingWork < threadPoolInstance._maxThreads && + counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal) { if (debuggerBreakOnWorkStarvation) { Debugger.Break(); } - ThreadCounts newCounts = counts; short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1); - newCounts.NumThreadsGoal = newNumThreadsGoal; - - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) - { - HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.Starvation); - WorkerThread.MaybeAddWorkingWorker(threadPoolInstance); - break; - } - - counts = oldCounts; + threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.Starvation); + addWorker = true; } } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(threadPoolInstance); } } - } while ( - needGateThreadForRuntime || - threadPoolInstance._separated.numRequestedWorkers > 0 || - Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) > GetRunningStateForNumRuns(0)); + + if (!needGateThreadForRuntime && + threadPoolInstance._separated.numRequestedWorkers <= 0 && + threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && + Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0)) + { + break; + } + } } } + public static void Wake(PortableThreadPool threadPoolInstance) + { + DelayEvent.Set(); + EnsureRunning(threadPoolInstance); + } + // called by logic to spawn new worker threads, return true if it's been too long // since the last dequeue operation - takes number of worker threads into account // in deciding "too long" private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance) { - int delay = Environment.TickCount - Volatile.Read(ref threadPoolInstance._separated.lastDequeueTime); - - int minimumDelay; - + uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime); + uint minimumDelay; if (threadPoolInstance._cpuUtilization < CpuUtilizationLow) { - minimumDelay = GateThreadDelayMs; + minimumDelay = GateActivitiesPeriodMs; } else { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); - int numThreads = counts.NumThreadsGoal; - minimumDelay = numThreads * DequeueDelayThresholdMs; + minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs; } + return delay > minimumDelay; } @@ -148,7 +206,7 @@ internal static void EnsureRunningSlow(PortableThreadPool threadPoolInstance) int numRunsMask = Interlocked.Exchange(ref threadPoolInstance._separated.gateThreadRunningState, GetRunningStateForNumRuns(MaxRuns)); if (numRunsMask == GetRunningStateForNumRuns(0)) { - s_runGateThreadEvent.Set(); + RunGateThreadEvent.Set(); } else if ((numRunsMask & GateThreadRunningMask) == 0) { @@ -188,6 +246,82 @@ private static void CreateGateThread(PortableThreadPool threadPoolInstance) } } } + + private struct DelayHelper + { + private int _previousGateActivitiesTimeMs; + private int _previousBlockingAdjustmentDelayStartTimeMs; + private uint _previousBlockingAdjustmentDelayMs; + private bool _runGateActivitiesAfterNextDelay; + private bool _adjustForBlockingAfterNextDelay; + + public void SetGateActivitiesTime(int currentTimeMs) + { + _previousGateActivitiesTimeMs = currentTimeMs; + } + + public void SetBlockingAdjustmentTimeAndDelay(int currentTimeMs, uint delayMs) + { + _previousBlockingAdjustmentDelayStartTimeMs = currentTimeMs; + _previousBlockingAdjustmentDelayMs = delayMs; + } + + public void ClearBlockingAdjustmentDelay() => _previousBlockingAdjustmentDelayMs = 0; + public bool HasBlockingAdjustmentDelay => _previousBlockingAdjustmentDelayMs != 0; + + public uint GetNextDelay(int currentTimeMs) + { + uint elapsedMsSincePreviousGateActivities = (uint)(currentTimeMs - _previousGateActivitiesTimeMs); + uint nextDelayForGateActivities = + elapsedMsSincePreviousGateActivities < GateActivitiesPeriodMs + ? GateActivitiesPeriodMs - elapsedMsSincePreviousGateActivities + : 1; + if (_previousBlockingAdjustmentDelayMs == 0) + { + _runGateActivitiesAfterNextDelay = true; + _adjustForBlockingAfterNextDelay = false; + return nextDelayForGateActivities; + } + + uint elapsedMsSincePreviousBlockingAdjustmentDelay = + (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs); + uint nextDelayForBlockingAdjustment = + elapsedMsSincePreviousBlockingAdjustmentDelay < _previousBlockingAdjustmentDelayMs + ? _previousBlockingAdjustmentDelayMs - elapsedMsSincePreviousBlockingAdjustmentDelay + : 1; + uint nextDelay = Math.Min(nextDelayForGateActivities, nextDelayForBlockingAdjustment); + _runGateActivitiesAfterNextDelay = nextDelay == nextDelayForGateActivities; + _adjustForBlockingAfterNextDelay = nextDelay == nextDelayForBlockingAdjustment; + Debug.Assert(nextDelay <= GateActivitiesPeriodMs); + return nextDelay; + } + + public bool ShouldPerformGateActivities(int currentTimeMs, bool wasSignaledToWake) + { + bool result = + (!wasSignaledToWake && _runGateActivitiesAfterNextDelay) || + (uint)(currentTimeMs - _previousGateActivitiesTimeMs) >= GateActivitiesPeriodMs; + if (result) + { + SetGateActivitiesTime(currentTimeMs); + } + return result; + } + + public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake) + { + Debug.Assert(HasBlockingAdjustmentDelay); + + if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay) + { + return true; + } + + uint elapsedMsSincePreviousBlockingAdjustmentDelay = + (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs); + return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs; + } + } } internal static void EnsureGateThreadRunning() => GateThread.EnsureRunning(ThreadPoolInstance); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs index aead56b38acab..3a2b10096887d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs @@ -33,6 +33,7 @@ public enum StateOrTransition Stabilizing, Starvation, ThreadTimedOut, + CooperativeBlocking, } // SOS's ThreadPool command depends on the names of all fields @@ -321,10 +322,12 @@ public HillClimbing() newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1); // - // Make sure our control setting is within the ThreadPool's limits + // Make sure our control setting is within the ThreadPool's limits. When some threads are blocked due to + // cooperative blocking, ensure that hill climbing does not decrease the thread count below the expected + // minimum. // int maxThreads = threadPoolInstance._maxThreads; - int minThreads = threadPoolInstance._minThreads; + int minThreads = threadPoolInstance.MinThreadsGoal; _currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting); _currentControlSetting = Math.Max(minThreads, _currentControlSetting); @@ -355,7 +358,11 @@ public HillClimbing() // If all of this caused an actual change in thread count, log that as well. // if (newThreadCount != currentThreadCount) + { ChangeThreadCount(newThreadCount, state); + _secondsElapsedSinceLastChange = 0; + _completionsSinceLastChange = 0; + } // // Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic @@ -377,11 +384,14 @@ public HillClimbing() private void ChangeThreadCount(int newThreadCount, StateOrTransition state) { _lastThreadCount = newThreadCount; - _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); + + if (state != StateOrTransition.CooperativeBlocking) // this can be noisy + { + _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); + } + double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0; LogTransition(newThreadCount, throughput, state); - _secondsElapsedSinceLastChange = 0; - _completionsSinceLastChange = 0; } private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index 1c0c121847387..d4673f5cd7329 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -16,15 +16,14 @@ private struct ThreadCounts // SOS's ThreadPool command depends on this layout private const byte NumProcessingWorkShift = 0; private const byte NumExistingThreadsShift = 16; - private const byte NumThreadsGoalShift = 32; - private ulong _data; // SOS's ThreadPool command depends on this name + private uint _data; // SOS's ThreadPool command depends on this name - private ThreadCounts(ulong data) => _data = data; + private ThreadCounts(uint data) => _data = data; private short GetInt16Value(byte shift) => (short)(_data >> shift); private void SetInt16Value(short value, byte shift) => - _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift); + _data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift); /// /// Number of threads processing work items. @@ -44,7 +43,15 @@ public void SubtractNumProcessingWork(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumProcessingWork); - _data -= (ulong)(ushort)value << NumProcessingWorkShift; + _data -= (uint)(ushort)value << NumProcessingWorkShift; + } + + public void InterlockedDecrementNumProcessingWork() + { + Debug.Assert(NumProcessingWorkShift == 0); + + ThreadCounts counts = new ThreadCounts(Interlocked.Decrement(ref _data)); + Debug.Assert(counts.NumProcessingWork >= 0); } /// @@ -65,20 +72,7 @@ public void SubtractNumExistingThreads(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumExistingThreads); - _data -= (ulong)(ushort)value << NumExistingThreadsShift; - } - - /// - /// Max possible thread pool threads we want to have. - /// - public short NumThreadsGoal - { - get => GetInt16Value(NumThreadsGoalShift); - set - { - Debug.Assert(value > 0); - SetInt16Value(value, NumThreadsGoalShift); - } + _data -= (uint)(ushort)value << NumExistingThreadsShift; } public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data)); @@ -90,7 +84,7 @@ public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCou public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data; public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data; - public override int GetHashCode() => (int)_data + (int)(_data >> 32); + public override int GetHashCode() => (int)_data; } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index 917a1113f5963..8b1a44946d10f 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -12,6 +12,11 @@ internal sealed partial class PortableThreadPool /// private static class WorkerThread { + // This value represents an assumption of how much uncommited stack space a worker thread may use in the future. + // Used in calculations to estimate when to throttle the rate of thread injection to reduce the possibility of + // preexisting threads from running out of memory when using new stack space in low-memory situations. + public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB + /// /// Semaphore for controlling how many threads are currently working. /// @@ -43,7 +48,7 @@ private static void WorkerThreadStart() (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); } - LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock; + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; LowLevelLifoSemaphore semaphore = s_semaphore; while (true) @@ -54,7 +59,7 @@ private static void WorkerThreadStart() bool alreadyRemovedWorkingWorker = false; while (TakeActiveRequest(threadPoolInstance)) { - Volatile.Write(ref threadPoolInstance._separated.lastDequeueTime, Environment.TickCount); + threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; if (!ThreadPoolWorkQueue.Dispatch()) { // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have @@ -96,14 +101,14 @@ private static void WorkerThreadStart() } } - hillClimbingThreadAdjustmentLock.Acquire(); + threadAdjustmentLock.Acquire(); try { // At this point, the thread's wait timed out. We are shutting down this thread. // We are going to decrement the number of exisiting threads to no longer include this one // and then change the max number of threads in the thread pool to reflect that we don't need as many // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; while (true) { // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, @@ -119,13 +124,21 @@ private static void WorkerThreadStart() ThreadCounts newCounts = counts; newCounts.SubtractNumExistingThreads(1); short newNumExistingThreads = (short)(numExistingThreads - 1); - short newNumThreadsGoal = Math.Max(threadPoolInstance._minThreads, Math.Min(newNumExistingThreads, newCounts.NumThreadsGoal)); - newCounts.NumThreadsGoal = newNumThreadsGoal; ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts) { - HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut); + short newNumThreadsGoal = + Math.Max( + threadPoolInstance.MinThreadsGoal, + Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal)); + if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal) + { + threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.ThreadTimedOut); + } if (NativeRuntimeEventSource.Log.IsEnabled()) { @@ -139,7 +152,7 @@ private static void WorkerThreadStart() } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); } } } @@ -149,19 +162,7 @@ private static void WorkerThreadStart() /// private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) { - ThreadCounts currentCounts = threadPoolInstance._separated.counts.VolatileRead(); - while (true) - { - ThreadCounts newCounts = currentCounts; - newCounts.SubtractNumProcessingWork(1); - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, currentCounts); - - if (oldCounts == currentCounts) - { - break; - } - currentCounts = oldCounts; - } + threadPoolInstance._separated.counts.InterlockedDecrementNumProcessingWork(); // It's possible that we decided we had thread requests just before a request came in, // but reduced the worker count *after* the request came in. In this case, we might @@ -175,12 +176,12 @@ private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance) { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork; while (true) { numProcessingWork = counts.NumProcessingWork; - if (numProcessingWork >= counts.NumThreadsGoal) + if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal) { return; } @@ -219,7 +220,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance continue; } - counts = threadPoolInstance._separated.counts.VolatileRead(); + counts = threadPoolInstance._separated.counts; while (true) { ThreadCounts newCounts = counts; @@ -245,17 +246,17 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance /// Whether or not this thread should stop processing work even if there is still work in the queue. internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolInstance) { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; while (true) { - // When there are more threads processing work than the thread count goal, hill climbing must have decided + // When there are more threads processing work than the thread count goal, it may have been decided // to decrease the number of threads. Stop processing if the counts can be updated. We may have more // threads existing than the thread count goal and that is ok, the cold ones will eventually time out if // the thread count goal is not increased again. This logic is a bit different from the original CoreCLR // code from which this implementation was ported, which turns a processing thread into a retired thread // and checks for pending requests like RemoveWorkingWorker. In this implementation there are // no retired threads, so only the count of threads processing work is considered. - if (counts.NumProcessingWork <= counts.NumThreadsGoal) + if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal) { return false; } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index b18e963f36066..8c6163fa2f7d2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -28,8 +28,10 @@ internal sealed partial class PortableThreadPool private const int CpuUtilizationHigh = 95; private const int CpuUtilizationLow = 80; - private static readonly short s_forcedMinWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false); - private static readonly short s_forcedMaxWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); + private static readonly short ForcedMinWorkerThreads = + AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false); + private static readonly short ForcedMaxWorkerThreads = + AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); [ThreadStatic] private static object? t_completionCountObject; @@ -43,43 +45,55 @@ internal sealed partial class PortableThreadPool private int _cpuUtilization; // SOS's ThreadPool command depends on this name private short _minThreads; private short _maxThreads; - private readonly LowLevelLock _maxMinThreadLock = new LowLevelLock(); [StructLayout(LayoutKind.Explicit, Size = Internal.PaddingHelpers.CACHE_LINE_SIZE * 6)] private struct CacheLineSeparated { [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)] public ThreadCounts counts; // SOS's ThreadPool command depends on this name + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))] + public short numThreadsGoal; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)] public int lastDequeueTime; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)] public int priorCompletionCount; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int))] public int priorCompletedWorkRequestsTime; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)] public int nextCompletedWorkRequestsTime; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)] public volatile int numRequestedWorkers; - [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 5)] + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))] public int gateThreadRunningState; } - private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name private long _currentSampleStartTime; private readonly ThreadInt64PersistentCounter _completionCounter = new ThreadInt64PersistentCounter(); private int _threadAdjustmentIntervalMs; - private readonly LowLevelLock _hillClimbingThreadAdjustmentLock = new LowLevelLock(); + private short _numBlockedThreads; + private short _numThreadsAddedDueToBlocking; + private PendingBlockingAdjustment _pendingBlockingAdjustment; + + private long _memoryUsageBytes; + private long _memoryLimitBytes; + + private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock(); + + private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name private PortableThreadPool() { - _minThreads = s_forcedMinWorkerThreads > 0 ? s_forcedMinWorkerThreads : (short)Environment.ProcessorCount; + _minThreads = ForcedMinWorkerThreads > 0 ? ForcedMinWorkerThreads : (short)Environment.ProcessorCount; if (_minThreads > MaxPossibleThreadCount) { _minThreads = MaxPossibleThreadCount; } - _maxThreads = s_forcedMaxWorkerThreads > 0 ? s_forcedMaxWorkerThreads : DefaultMaxWorkerThreadCount; + _maxThreads = ForcedMaxWorkerThreads > 0 ? ForcedMaxWorkerThreads : DefaultMaxWorkerThreadCount; if (_maxThreads > MaxPossibleThreadCount) { _maxThreads = MaxPossibleThreadCount; @@ -89,13 +103,7 @@ private PortableThreadPool() _maxThreads = _minThreads; } - _separated = new CacheLineSeparated - { - counts = new ThreadCounts - { - NumThreadsGoal = _minThreads - } - }; + _separated.numThreadsGoal = _minThreads; } public bool SetMinThreads(int workerThreads, int ioCompletionThreads) @@ -105,7 +113,10 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) return false; } - _maxMinThreadLock.Acquire(); + bool addWorker = false; + bool wakeGateThread = false; + + _threadAdjustmentLock.Acquire(); try { if (workerThreads > _maxThreads || !ThreadPool.CanSetMinIOCompletionThreads(ioCompletionThreads)) @@ -115,39 +126,45 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) ThreadPool.SetMinIOCompletionThreads(ioCompletionThreads); - if (s_forcedMinWorkerThreads != 0) + if (ForcedMinWorkerThreads != 0) { return true; } short newMinThreads = (short)Math.Max(1, Math.Min(workerThreads, MaxPossibleThreadCount)); _minThreads = newMinThreads; - - ThreadCounts counts = _separated.counts.VolatileRead(); - while (counts.NumThreadsGoal < newMinThreads) + if (_numBlockedThreads > 0) { - ThreadCounts newCounts = counts; - newCounts.NumThreadsGoal = newMinThreads; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) + // Blocking adjustment will adjust the goal according to its heuristics + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately) { - if (_separated.numRequestedWorkers > 0) - { - WorkerThread.MaybeAddWorkingWorker(this); - } - break; + _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately; + wakeGateThread = true; + } + } + else if (_separated.numThreadsGoal < newMinThreads) + { + _separated.numThreadsGoal = newMinThreads; + if (_separated.numRequestedWorkers > 0) + { + addWorker = true; } - - counts = oldCounts; } - - return true; } finally { - _maxMinThreadLock.Release(); + _threadAdjustmentLock.Release(); } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); + } + else if (wakeGateThread) + { + GateThread.Wake(this); + } + return true; } public int GetMinThreads() => Volatile.Read(ref _minThreads); @@ -159,7 +176,7 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads) return false; } - _maxMinThreadLock.Acquire(); + _threadAdjustmentLock.Acquire(); try { if (workerThreads < _minThreads || !ThreadPool.CanSetMaxIOCompletionThreads(ioCompletionThreads)) @@ -169,34 +186,22 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads) ThreadPool.SetMaxIOCompletionThreads(ioCompletionThreads); - if (s_forcedMaxWorkerThreads != 0) + if (ForcedMaxWorkerThreads != 0) { return true; } short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount); _maxThreads = newMaxThreads; - - ThreadCounts counts = _separated.counts.VolatileRead(); - while (counts.NumThreadsGoal > newMaxThreads) + if (_separated.numThreadsGoal > newMaxThreads) { - ThreadCounts newCounts = counts; - newCounts.NumThreadsGoal = newMaxThreads; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) - { - break; - } - - counts = oldCounts; + _separated.numThreadsGoal = newMaxThreads; } - return true; } finally { - _maxMinThreadLock.Release(); + _threadAdjustmentLock.Release(); } } @@ -232,7 +237,7 @@ private object CreateThreadLocalCompletionCountObject() private void NotifyWorkItemProgress(object threadLocalCompletionCountObject, int currentTimeMs) { ThreadInt64PersistentCounter.Increment(threadLocalCompletionCountObject); - Volatile.Write(ref _separated.lastDequeueTime, Environment.TickCount); + _separated.lastDequeueTime = currentTimeMs; if (ShouldAdjustMaxWorkersActive(currentTimeMs)) { @@ -257,15 +262,23 @@ internal bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, i // private void AdjustMaxWorkersActive() { - LowLevelLock hillClimbingThreadAdjustmentLock = _hillClimbingThreadAdjustmentLock; - if (!hillClimbingThreadAdjustmentLock.TryAcquire()) + LowLevelLock threadAdjustmentLock = _threadAdjustmentLock; + if (!threadAdjustmentLock.TryAcquire()) { // The lock is held by someone else, they will take care of this for us return; } + bool addWorker = false; try { + // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the + // blocking adjustment heuristics and increase the thread count too quickly. + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None) + { + return; + } + long startTime = _currentSampleStartTime; long endTime = Stopwatch.GetTimestamp(); long freq = Stopwatch.Frequency; @@ -278,39 +291,24 @@ private void AdjustMaxWorkersActive() int totalNumCompletions = (int)_completionCounter.Count; int numCompletions = totalNumCompletions - _separated.priorCompletionCount; - ThreadCounts currentCounts = _separated.counts.VolatileRead(); - int newMax; - (newMax, _threadAdjustmentIntervalMs) = HillClimbing.ThreadPoolHillClimber.Update(currentCounts.NumThreadsGoal, elapsedSeconds, numCompletions); - - while (newMax != currentCounts.NumThreadsGoal) + int newNumThreadsGoal; + (newNumThreadsGoal, _threadAdjustmentIntervalMs) = + HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions); + short oldNumThreadsGoal = _separated.numThreadsGoal; + if (oldNumThreadsGoal != (short)newNumThreadsGoal) { - ThreadCounts newCounts = currentCounts; - newCounts.NumThreadsGoal = (short)newMax; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, currentCounts); - if (oldCounts == currentCounts) - { - // - // If we're increasing the max, inject a thread. If that thread finds work, it will inject - // another thread, etc., until nobody finds work or we reach the new maximum. - // - // If we're reducing the max, whichever threads notice this first will sleep and timeout themselves. - // - if (newMax > oldCounts.NumThreadsGoal) - { - WorkerThread.MaybeAddWorkingWorker(this); - } - break; - } - - if (oldCounts.NumThreadsGoal > currentCounts.NumThreadsGoal && oldCounts.NumThreadsGoal >= newMax) + _separated.numThreadsGoal = (short)newNumThreadsGoal; + + // + // If we're increasing the goal, inject a thread. If that thread finds work, it will inject + // another thread, etc., until nobody finds work or we reach the new goal. + // + // If we're reducing the goal, whichever threads notice this first will sleep and timeout themselves. + // + if (newNumThreadsGoal > oldNumThreadsGoal) { - // someone (probably the gate thread) increased the thread count more than - // we are about to do. Don't interfere. - break; + addWorker = true; } - - currentCounts = oldCounts; } _separated.priorCompletionCount = totalNumCompletions; @@ -321,12 +319,22 @@ private void AdjustMaxWorkersActive() } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); } } private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) { + if (HillClimbing.IsDisabled) + { + return false; + } + // We need to subtract by prior time because Environment.TickCount can wrap around, making a comparison of absolute // times unreliable. Intervals are unsigned to avoid wrapping around on the subtract after enough time elapses, and // this also prevents the initial elapsed interval from being negative due to the prior and next times being @@ -334,19 +342,26 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) int priorTime = Volatile.Read(ref _separated.priorCompletedWorkRequestsTime); uint requiredInterval = (uint)(_separated.nextCompletedWorkRequestsTime - priorTime); uint elapsedInterval = (uint)(currentTimeMs - priorTime); - if (elapsedInterval >= requiredInterval) + if (elapsedInterval < requiredInterval) + { + return false; + } + + // Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal. + // In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's + // wait until the system responds to that change before calling into hill climbing again. This condition should + // be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes + // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit + // different from the original CoreCLR code from which this implementation was ported because in this + // implementation there are no retired threads, so only the count of threads processing work is considered. + if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal) { - // Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal. - // In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's - // wait until the system responds to that change before calling into hill climbing again. This condition should - // be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes - // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit - // different from the original CoreCLR code from which this implementation was ported because in this - // implementation there are no retired threads, so only the count of threads processing work is considered. - ThreadCounts counts = _separated.counts.VolatileRead(); - return counts.NumProcessingWork <= counts.NumThreadsGoal && !HillClimbing.IsDisabled; + return false; } - return false; + + // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the + // blocking adjustment heuristics and increase the thread count too quickly. + return _pendingBlockingAdjustment == PendingBlockingAdjustment.None; } internal void RequestWorker() @@ -357,5 +372,16 @@ internal void RequestWorker() WorkerThread.MaybeAddWorkingWorker(this); GateThread.EnsureRunning(this); } + + private bool OnGen2GCCallback() + { + // Gen 2 GCs may be very infrequent in some cases. If it becomes an issue, consider updating the memory usage more + // frequently. The memory usage is only used for fallback purposes in blocking adjustment, so an artifically higher + // memory usage may cause blocking adjustment to fall back to slower adjustments sooner than necessary. + GCMemoryInfo gcMemoryInfo = GC.GetGCMemoryInfo(); + _memoryLimitBytes = gcMemoryInfo.HighMemoryLoadThresholdBytes; + _memoryUsageBytes = Math.Min(gcMemoryInfo.MemoryLoadBytes, gcMemoryInfo.HighMemoryLoadThresholdBytes); + return true; // continue receiving gen 2 GC callbacks + } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs index 53f0aebf2db20..923c4cf71bcf4 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @@ -2973,14 +2973,36 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can #pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44622 if (infiniteWait) { - returnValue = mres.Wait(Timeout.Infinite, cancellationToken); + bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked(); + try + { + returnValue = mres.Wait(Timeout.Infinite, cancellationToken); + } + finally + { + if (notifyWhenUnblocked) + { + ThreadPool.NotifyThreadUnblocked(); + } + } } else { uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks; if (elapsedTimeTicks < millisecondsTimeout) { - returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken); + bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked(); + try + { + returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken); + } + finally + { + if (notifyWhenUnblocked) + { + ThreadPool.NotifyThreadUnblocked(); + } + } } } #pragma warning restore CA1416 diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs index 08067113171c0..70ac108e32d74 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs @@ -100,6 +100,9 @@ public static void GetAvailableThreads(out int workerThreads, out int completion internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs) => PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs); + internal static bool NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); + internal static void NotifyThreadUnblocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); + internal static object GetOrCreateThreadLocalCompletionCountObject() => PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index f08e4ba4d7fe0..e0dfdc951cf87 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -725,7 +725,12 @@ internal static bool Dispatch() // int currentTickCount = Environment.TickCount; if (!ThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTickCount)) + { + // This thread is being parked and may remain inactive for a while. Transfer any thread-local work items + // to ensure that they would not be heavily delayed. + tl.TransferLocalWork(); return false; + } // Check if the dispatch quantum has expired if ((uint)(currentTickCount - startTickCount) < DispatchQuantumMs) @@ -824,21 +829,20 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject(); } + public void TransferLocalWork() + { + while (workStealingQueue.LocalPop() is object cb) + { + workQueue.Enqueue(cb, forceGlobal: true); + } + } + ~ThreadPoolWorkQueueThreadLocals() { // Transfer any pending workitems into the global queue so that they will be executed by another thread if (null != workStealingQueue) { - if (null != workQueue) - { - object? cb; - while ((cb = workStealingQueue.LocalPop()) != null) - { - Debug.Assert(null != cb); - workQueue.Enqueue(cb, forceGlobal: true); - } - } - + TransferLocalWork(); ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index fbd515aecb7f4..767e5bef92c16 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Threading.Tasks; using System.Threading.Tests; using Microsoft.DotNet.RemoteExecutor; @@ -885,6 +886,69 @@ public static void ThreadPoolThreadCreationDoesNotTransferExecutionContext() }).Dispose(); } + [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))] + public static void CooperativeBlockingCanCreateThreadsFaster() + { + // Run in a separate process to test in a clean thread pool environment such that work items queued by the test + // would cause the thread pool to create threads + RemoteExecutor.Invoke(() => + { + // All but the last of these work items will block and the last queued work item would release the blocking. + // Without cooperative blocking, this would lead to starvation after work items run. Since + // starvation adds threads at a rate of at most 2 per second, the extra 120 work items would take roughly 60 + // seconds to get unblocked and since the test waits for 30 seconds it would time out. Cooperative blocking is + // configured below to increase the rate of thread injection for testing purposes while getting a decent amount + // of coverage for its behavior. With cooperative blocking as configured below, the test should finish within a + // few seconds. + int processorCount = Environment.ProcessorCount; + int workItemCount = processorCount + 120; + SetBlockingConfigValue("ThreadsToAddWithoutDelay_ProcCountFactor", 1); + SetBlockingConfigValue("MaxDelayMs", 1); + + var allWorkItemsUnblocked = new AutoResetEvent(false); + + // Run a second iteration for some extra coverage. Iterations after the first one would be much faster because + // the necessary number of threads would already have been created by then, and would not add much to the test + // time. + for (int iterationIndex = 0; iterationIndex < 2; ++iterationIndex) + { + var tcs = new TaskCompletionSource(); + int unblockedThreadCount = 0; + + Action blockingWorkItem = _ => + { + tcs.Task.Wait(); + if (Interlocked.Increment(ref unblockedThreadCount) == workItemCount - 1) + { + allWorkItemsUnblocked.Set(); + } + }; + + for (int i = 0; i < workItemCount - 1; ++i) + { + ThreadPool.UnsafeQueueUserWorkItem(blockingWorkItem, 0, preferLocal: false); + } + + Action unblockingWorkItem = _ => tcs.SetResult(0); + ThreadPool.UnsafeQueueUserWorkItem(unblockingWorkItem, 0, preferLocal: false); + Assert.True(allWorkItemsUnblocked.WaitOne(30_000)); + } + + void SetBlockingConfigValue(string name, int value) => + AppContextSetData("System.Threading.ThreadPool.Blocking." + name, value); + + void AppContextSetData(string name, object value) + { + typeof(AppContext).InvokeMember( + "SetData", + BindingFlags.ExactBinding | BindingFlags.InvokeMethod | BindingFlags.Public | BindingFlags.Static, + null, + null, + new object[] { name, value }); + } + }).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; } diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs index df6112fa3696c..9cfc8c91999b5 100644 --- a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs +++ b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs @@ -90,6 +90,12 @@ internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountOb return true; } + internal static bool NotifyThreadBlocked() => false; + + internal static void NotifyThreadUnblocked() + { + } + internal static object? GetOrCreateThreadLocalCompletionCountObject() => null; private static RegisteredWaitHandle RegisterWaitForSingleObject(