Skip to content

Commit

Permalink
Address feedback, add an assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
kouvel committed Jun 2, 2021
1 parent 984888f commit 56bd4d9
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,20 +522,13 @@ internal static void NotifyWorkItemProgress()
[MethodImpl(MethodImplOptions.InternalCall)]
private static extern void NotifyWorkItemProgressNative();

internal static void NotifyThreadBlocked()
{
if (UsePortableThreadPool)
{
PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
}
}
internal static bool NotifyThreadBlocked() =>
UsePortableThreadPool && PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();

internal static void NotifyThreadUnblocked()
{
if (UsePortableThreadPool)
{
PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();
}
Debug.Assert(UsePortableThreadPool);
PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();
}

internal static object? GetOrCreateThreadLocalCompletionCountObject() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,10 @@ private short TargetThreadsGoalForBlockingAdjustment
{
_threadAdjustmentLock.VerifyIsLocked();

short targetThreadsGoal = _minThreads;
if (_numBlockedThreads <= 0)
{
return targetThreadsGoal;
}

short maxThreads = MaxThreadsForBlockingAdjustment;
targetThreadsGoal += _numBlockedThreads;
if (targetThreadsGoal < _numBlockedThreads || targetThreadsGoal > maxThreads)
{
targetThreadsGoal = maxThreads;
}
return targetThreadsGoal;
return
_numBlockedThreads <= 0
? _minThreads
: (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)MaxThreadsForBlockingAdjustment);
}
}

Expand All @@ -43,17 +34,16 @@ private short MaxThreadsForBlockingAdjustment
get
{
_threadAdjustmentLock.VerifyIsLocked();

short result = (short)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback);
return result < BlockingConfig.MaxThreadsToAddBeforeFallback || result > _maxThreads ? _maxThreads : result;
return
(short)Math.Min((ushort)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback), (ushort)_maxThreads);
}
}

public void NotifyThreadBlocked()
public bool NotifyThreadBlocked()
{
if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread)
{
return;
return false;
}

bool wakeGateThread = false;
Expand Down Expand Up @@ -82,14 +72,13 @@ public void NotifyThreadBlocked()
{
GateThread.Wake(this);
}
return true;
}

public void NotifyThreadUnblocked()
{
if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread)
{
return;
}
Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled);
Debug.Assert(Thread.CurrentThread.IsThreadPoolThread);

bool wakeGateThread = false;
_threadAdjustmentLock.Acquire();
Expand Down Expand Up @@ -173,11 +162,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
}

short maxThreads = MaxThreadsForBlockingAdjustment;
short configuredMaxThreadsWithoutDelay = (short)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay);
if (configuredMaxThreadsWithoutDelay < BlockingConfig.ThreadsToAddWithoutDelay)
{
configuredMaxThreadsWithoutDelay = maxThreads;
}
short configuredMaxThreadsWithoutDelay =
(short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)maxThreads);

do
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public void SubtractNumProcessingWork(short value)
public void InterlockedDecrementNumProcessingWork()
{
Debug.Assert(NumProcessingWorkShift == 0);
Interlocked.Decrement(ref _data);

ThreadCounts counts = new ThreadCounts(Interlocked.Decrement(ref _data));
Debug.Assert(counts.NumProcessingWork >= 0);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2973,29 +2973,35 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can
#pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44622
if (infiniteWait)
{
ThreadPool.NotifyThreadBlocked();
bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
try
{
returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
}
finally
{
ThreadPool.NotifyThreadUnblocked();
if (notifyWhenUnblocked)
{
ThreadPool.NotifyThreadUnblocked();
}
}
}
else
{
uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks;
if (elapsedTimeTicks < millisecondsTimeout)
{
ThreadPool.NotifyThreadBlocked();
bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
try
{
returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
}
finally
{
ThreadPool.NotifyThreadUnblocked();
if (notifyWhenUnblocked)
{
ThreadPool.NotifyThreadUnblocked();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion
internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs) =>
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs);

internal static void NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
internal static bool NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
internal static void NotifyThreadUnblocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();

internal static object GetOrCreateThreadLocalCompletionCountObject() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,10 +831,8 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)

public void TransferLocalWork()
{
object? cb;
while ((cb = workStealingQueue.LocalPop()) != null)
while (workStealingQueue.LocalPop() is object cb)
{
Debug.Assert(null != cb);
workQueue.Enqueue(cb, forceGlobal: true);
}
}
Expand All @@ -844,11 +842,7 @@ public void TransferLocalWork()
// Transfer any pending workitems into the global queue so that they will be executed by another thread
if (null != workStealingQueue)
{
if (null != workQueue)
{
TransferLocalWork();
}

TransferLocalWork();
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
}
}
Expand Down

0 comments on commit 56bd4d9

Please sign in to comment.