Skip to content

Commit

Permalink
Normalize MaxAvailableMemory based on other candidates. Add Activatio…
Browse files Browse the repository at this point in the history
…nCountWeight to options (#9028)
  • Loading branch information
ReubenBond committed May 31, 2024
1 parent 47b29f8 commit f6659d9
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public sealed class ResourceOptimizedPlacementOptions
/// <summary>
/// The default value of <see cref="MemoryUsageWeight"/>.
/// </summary>
public const int DEFAULT_MEMORY_USAGE_WEIGHT = 30;
public const int DEFAULT_MEMORY_USAGE_WEIGHT = 20;

/// <summary>
/// The importance of the available memory to the silo.
Expand Down Expand Up @@ -65,7 +65,21 @@ public sealed class ResourceOptimizedPlacementOptions
/// <summary>
/// The default value of <see cref="MaxAvailableMemoryWeight"/>.
/// </summary>
public const int DEFAULT_MAX_AVAILABLE_MEMORY_WEIGHT = 10;
public const int DEFAULT_MAX_AVAILABLE_MEMORY_WEIGHT = 5;

/// <summary>
/// The importance of the current activation count to the silo.
/// </summary>
/// <remarks><i>
/// <para>A <u>higher</u> values results in the placement favoring silos with <u>lower</u> activation count.</para>
/// <para>Valid range is [0-100]</para>
/// </i></remarks>
public int ActivationCountWeight { get; set; } = DEFAULT_ACTIVATION_COUNT_WEIGHT;

/// <summary>
/// The default value of <see cref="ActivationCountWeight"/>.
/// </summary>
public const int DEFAULT_ACTIVATION_COUNT_WEIGHT = 15;

/// <summary>
/// The specified margin for which: if two silos (one of them being the local to the current pending activation), have a utilization score that should be considered "the same" within this margin.
Expand Down Expand Up @@ -113,6 +127,11 @@ public void ValidateConfiguration()
ThrowOutOfRange(nameof(ResourceOptimizedPlacementOptions.MaxAvailableMemoryWeight));
}

if (_options.ActivationCountWeight < 0 || _options.ActivationCountWeight > 100)
{
ThrowOutOfRange(nameof(ResourceOptimizedPlacementOptions.ActivationCountWeight));
}

if (_options.LocalSiloPreferenceMargin < 0 || _options.LocalSiloPreferenceMargin > 100)
{
ThrowOutOfRange(nameof(ResourceOptimizedPlacementOptions.LocalSiloPreferenceMargin));
Expand Down
128 changes: 59 additions & 69 deletions src/Orleans.Runtime/Placement/ResourceOptimizedPlacementDirector.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#nullable enable
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
Expand All @@ -14,37 +13,36 @@ namespace Orleans.Runtime.Placement;
// See: https://www.ledjonbehluli.com/posts/orleans_resource_placement_kalman/
internal sealed class ResourceOptimizedPlacementDirector : IPlacementDirector, ISiloStatisticsChangeListener
{
/// <summary>
/// 1 / (1024 * 1024)
/// </summary>
private const float MaxAvailableMemoryScalingFactor = 0.00000095367431640625f;
private const int FourKiloByte = 4096;

private readonly SiloAddress _localSilo;
private readonly NormalizedWeights _weights;
private readonly float _localSiloPreferenceMargin;
private readonly ConcurrentDictionary<SiloAddress, ResourceStatistics> _siloStatistics = [];

private Task<SiloAddress> _cachedLocalSilo;
private readonly Task<SiloAddress> _cachedLocalSilo;

public ResourceOptimizedPlacementDirector(
ILocalSiloDetails localSiloDetails,
DeploymentLoadPublisher deploymentLoadPublisher,
IOptions<ResourceOptimizedPlacementOptions> options)
{
_localSilo = localSiloDetails.SiloAddress;
_cachedLocalSilo = Task.FromResult(_localSilo);
_weights = NormalizeWeights(options.Value);
_localSiloPreferenceMargin = (float)options.Value.LocalSiloPreferenceMargin / 100;
deploymentLoadPublisher.SubscribeToStatisticsChangeEvents(this);
}

private static NormalizedWeights NormalizeWeights(ResourceOptimizedPlacementOptions input)
{
int totalWeight = input.CpuUsageWeight + input.MemoryUsageWeight + input.AvailableMemoryWeight + input.MaxAvailableMemoryWeight;
int totalWeight = input.CpuUsageWeight + input.MemoryUsageWeight + input.AvailableMemoryWeight + input.MaxAvailableMemoryWeight + input.ActivationCountWeight;

return totalWeight == 0 ? new(0f, 0f, 0f, 0f) :
return totalWeight == 0 ? new(0f, 0f, 0f, 0f, 0f) :
new(
CpuUsageWeight: (float)input.CpuUsageWeight / totalWeight,
MemoryUsageWeight: (float)input.MemoryUsageWeight / totalWeight,
AvailableMemoryWeight: (float)input.AvailableMemoryWeight / totalWeight,
MaxAvailableMemoryWeight: (float)input.MaxAvailableMemoryWeight / totalWeight);
MaxAvailableMemoryWeight: (float)input.MaxAvailableMemoryWeight / totalWeight,
ActivationCountWeight: (float)input.ActivationCountWeight / totalWeight);
}

public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTarget target, IPlacementContext context)
Expand All @@ -58,7 +56,7 @@ public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTa

if (compatibleSilos.Length == 0)
{
throw new SiloUnavailableException($"Cannot place grain with Id = [{target.GrainIdentity}], because there are no compatible silos.");
throw new SiloUnavailableException($"Cannot place grain '{target.GrainIdentity}' because there are no compatible silos.");
}

if (compatibleSilos.Length == 1)
Expand All @@ -71,23 +69,11 @@ public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTa
return Task.FromResult(compatibleSilos[Random.Shared.Next(compatibleSilos.Length)]);
}

var bestCandidate = GetBestSiloCandidate(compatibleSilos);
if (IsLocalSiloPreferable(context, compatibleSilos, bestCandidate.Value))
{
return _cachedLocalSilo ??= Task.FromResult(context.LocalSilo);
}

return Task.FromResult(bestCandidate.Key);
}

private KeyValuePair<SiloAddress, float> GetBestSiloCandidate(SiloAddress[] compatibleSilos)
{
(int Index, float Score) pick;
int compatibleSilosCount = compatibleSilos.Length;

// It is good practice not to allocate more than 1[KB] on the stack
// but the size of ValueTuple<int, ResourceStatistics> = 24 bytes, by increasing
// the limit to 4[KB] we can stackalloc for up to 4096 / 24 ~= 170 silos in a cluster.
(int Index, float Score, float? LocalSiloScore) pick;
int compatibleSilosCount = compatibleSilos.Length;
if (compatibleSilosCount * Unsafe.SizeOf<(int, ResourceStatistics)>() <= FourKiloByte)
{
pick = MakePick(stackalloc (int, ResourceStatistics)[compatibleSilosCount]);
Expand All @@ -99,12 +85,22 @@ private KeyValuePair<SiloAddress, float> GetBestSiloCandidate(SiloAddress[] comp
ArrayPool<(int, ResourceStatistics)>.Shared.Return(relevantSilos);
}

return new KeyValuePair<SiloAddress, float>(compatibleSilos[pick.Index], pick.Score);
var localSiloScore = pick.LocalSiloScore;
if (!localSiloScore.HasValue || context.LocalSiloStatus != SiloStatus.Active || localSiloScore.Value - _localSiloPreferenceMargin > pick.Score)
{
var bestCandidate = compatibleSilos[pick.Index];
return Task.FromResult(bestCandidate);
}

(int, float) MakePick(Span<(int, ResourceStatistics)> relevantSilos)
return _cachedLocalSilo;

(int PickIndex, float PickScore, float? LocalSiloScore) MakePick(scoped Span<(int, ResourceStatistics)> relevantSilos)
{
// Get all compatible silos which aren't overloaded
int relevantSilosCount = 0;
float maxMaxAvailableMemory = 0;
int maxActivationCount = 0;
ResourceStatistics? localSiloStatistics = null;
for (var i = 0; i < compatibleSilos.Length; ++i)
{
var silo = compatibleSilos[i];
Expand All @@ -114,6 +110,21 @@ private KeyValuePair<SiloAddress, float> GetBestSiloCandidate(SiloAddress[] comp
{
relevantSilos[relevantSilosCount++] = new(i, stats);
}

if (stats.MaxAvailableMemory > maxMaxAvailableMemory)
{
maxMaxAvailableMemory = stats.MaxAvailableMemory;
}

if (stats.ActivationCount > maxActivationCount)
{
maxActivationCount = stats.ActivationCount;
}

if (silo.Equals(_localSilo))
{
localSiloStatistics = stats;
}
}
}

Expand All @@ -131,7 +142,7 @@ private KeyValuePair<SiloAddress, float> GetBestSiloCandidate(SiloAddress[] comp

foreach (var (index, statistics) in candidates)
{
float score = CalculateScore(in statistics);
float score = CalculateScore(in statistics, maxMaxAvailableMemory, maxActivationCount);

// It's very unlikely, but there could be more than 1 silo that has the same score,
// so we apply some jittering to avoid pick the first one in the short-list.
Expand All @@ -143,7 +154,14 @@ private KeyValuePair<SiloAddress, float> GetBestSiloCandidate(SiloAddress[] comp
}
}

return pick;
float? localSiloScore = null;
if (localSiloStatistics.HasValue && !localSiloStatistics.Value.IsOverloaded)
{
var localStats = localSiloStatistics.Value;
localSiloScore = CalculateScore(in localStats, maxMaxAvailableMemory, maxActivationCount);
}

return (pick.Index, pick.Score, localSiloScore);
}

// Variant of the Modern Fisher-Yates shuffle which stops after shuffling the first `prefixLength` elements,
Expand All @@ -165,39 +183,8 @@ static void ShufflePrefix(Span<(int SiloIndex, ResourceStatistics SiloStatistics
}
}

private bool IsLocalSiloPreferable(IPlacementContext context, SiloAddress[] compatibleSilos, float bestCandidateScore)
{
if (context.LocalSiloStatus != SiloStatus.Active || !compatibleSilos.Contains(context.LocalSilo))
{
return false;
}

if (!_siloStatistics.TryGetValue(context.LocalSilo, out var localStats))
{
return false;
}

if (localStats.IsOverloaded)
{
return false;
}

var localSiloScore = CalculateScore(in localStats);
return localSiloScore - _localSiloPreferenceMargin <= bestCandidateScore;
}

/// <summary>
/// Always returns a value [0-1]
/// </summary>
/// <returns>
/// score = cpu_weight * (cpu_usage / 100) +
/// mem_usage_weight * (mem_usage / physical_mem) +
/// mem_avail_weight * [1 - (mem_avail / physical_mem)]
/// physical_mem_weight * (1 / (1024 * 1024 * physical_mem)
/// </returns>
/// <remarks>physical_mem is represented in [MB] to keep the result within [0-1] in cases of silos having physical_mem less than [1GB]</remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private float CalculateScore(ref readonly ResourceStatistics stats) // as size of ResourceStatistics > IntPtr, we pass it by (readonly)-reference to avoid potential defensive copying
private float CalculateScore(ref readonly ResourceStatistics stats, float maxMaxAvailableMemory, int maxActivationCount)
{
float normalizedCpuUsage = stats.CpuUsage / 100f;
float score = _weights.CpuUsageWeight * normalizedCpuUsage;
Expand All @@ -208,14 +195,16 @@ private float CalculateScore(ref readonly ResourceStatistics stats) // as size o

float normalizedMemoryUsage = stats.MemoryUsage / maxAvailableMemory;
float normalizedAvailableMemory = 1 - stats.AvailableMemory / maxAvailableMemory;
float normalizedMaxAvailableMemoryWeight = MaxAvailableMemoryScalingFactor * maxAvailableMemory;
float normalizedMaxAvailableMemory = maxAvailableMemory / maxMaxAvailableMemory;

score += _weights.MemoryUsageWeight * normalizedMemoryUsage +
_weights.AvailableMemoryWeight * normalizedAvailableMemory +
_weights.MaxAvailableMemoryWeight * normalizedMaxAvailableMemoryWeight;
_weights.MaxAvailableMemoryWeight * normalizedMaxAvailableMemory;
}

Debug.Assert(score >= 0f && score <= 1f);
score += _weights.ActivationCountWeight * stats.ActivationCount / maxActivationCount;

Debug.Assert(score >= 0f && score <= 1.01f);

return score;
}
Expand All @@ -230,8 +219,8 @@ public void SiloStatisticsChangeNotification(SiloAddress address, SiloRuntimeSta
addValueFactory: static (_, statistics) => ResourceStatistics.FromRuntime(statistics),
updateValueFactory: static (_, _, statistics) => ResourceStatistics.FromRuntime(statistics));

private record NormalizedWeights(float CpuUsageWeight, float MemoryUsageWeight, float AvailableMemoryWeight, float MaxAvailableMemoryWeight);
private readonly record struct ResourceStatistics(bool IsOverloaded, float CpuUsage, float MemoryUsage, float AvailableMemory, float MaxAvailableMemory)
private record NormalizedWeights(float CpuUsageWeight, float MemoryUsageWeight, float AvailableMemoryWeight, float MaxAvailableMemoryWeight, float ActivationCountWeight);
private readonly record struct ResourceStatistics(bool IsOverloaded, float CpuUsage, float MemoryUsage, float AvailableMemory, float MaxAvailableMemory, int ActivationCount)
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ResourceStatistics FromRuntime(SiloRuntimeStatistics statistics)
Expand All @@ -240,6 +229,7 @@ public static ResourceStatistics FromRuntime(SiloRuntimeStatistics statistics)
CpuUsage: statistics.EnvironmentStatistics.CpuUsagePercentage,
MemoryUsage: statistics.EnvironmentStatistics.MemoryUsageBytes,
AvailableMemory: statistics.EnvironmentStatistics.AvailableMemoryBytes,
MaxAvailableMemory: statistics.EnvironmentStatistics.MaximumAvailableMemoryBytes);
MaxAvailableMemory: statistics.EnvironmentStatistics.MaximumAvailableMemoryBytes,
ActivationCount: statistics.ActivationCount);
}
}
41 changes: 0 additions & 41 deletions test/TesterInternal/General/PlacementOptionsTest.cs

This file was deleted.

Loading

0 comments on commit f6659d9

Please sign in to comment.