Skip to content

Commit

Permalink
ConsistentRingProvider & VirtualBucketsRingProvider should unsubscrib…
Browse files Browse the repository at this point in the history
…e from ISiloStatusOracle on shutdown
  • Loading branch information
ReubenBond committed May 15, 2024
1 parent e2f3a88 commit 7a75050
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 58 deletions.
66 changes: 36 additions & 30 deletions src/Orleans.Runtime/ConsistentRing/ConsistentRingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ namespace Orleans.Runtime.ConsistentRing
{
/// <summary>
/// We use the 'backward/clockwise' definition to assign responsibilities on the ring.
/// E.g. in a ring of nodes {5, 10, 15} the responsible for key 7 is 10 (the node is responsible for its predecessing range).
/// E.g. in a ring of nodes {5, 10, 15} the responsible for key 7 is 10 (the node is responsible for its preceding range).
/// The backwards/clockwise approach is consistent with many overlays, e.g., Chord, Cassandra, etc.
/// Note: MembershipOracle uses 'forward/counter-clockwise' definition to assign responsibilities.
/// E.g. in a ring of nodes {5, 10, 15}, the responsible of key 7 is node 5 (the node is responsible for its sucessing range)..
/// E.g. in a ring of nodes {5, 10, 15}, the responsible of key 7 is node 5 (the node is responsible for its succeeding range).
/// </summary>
internal sealed class ConsistentRingProvider :
IConsistentRingProvider, ISiloStatusListener // make the ring shutdown-able?
IConsistentRingProvider, ISiloStatusListener, IDisposable
{
// internal, so that unit tests can access them
internal SiloAddress MyAddress { get; }
Expand All @@ -26,12 +26,14 @@ internal sealed class ConsistentRingProvider :
private bool isRunning;
private readonly int myKey;
private readonly List<IRingRangeListener> statusListeners = new();
private readonly ISiloStatusOracle _siloStatusOracle;
private (IRingRange OldRange, IRingRange NewRange, bool Increased) lastNotification;

public ConsistentRingProvider(SiloAddress siloAddr, ILoggerFactory loggerFactory)
public ConsistentRingProvider(SiloAddress siloAddr, ILoggerFactory loggerFactory, ISiloStatusOracle siloStatusOracle)
{
log = loggerFactory.CreateLogger<ConsistentRingProvider>();
MyAddress = siloAddr;
_siloStatusOracle = siloStatusOracle;
myKey = MyAddress.GetConsistentHashCode();

myRange = RangeFactory.CreateFullRange(); // i am responsible for the whole range
Expand All @@ -40,6 +42,7 @@ public ConsistentRingProvider(SiloAddress siloAddr, ILoggerFactory loggerFactory
// add myself to the list of members
AddServer(MyAddress);
Start();
siloStatusOracle.SubscribeToSiloStatusEvents(this);
}

/// <summary>
Expand Down Expand Up @@ -93,25 +96,14 @@ internal void AddServer(SiloAddress silo)
(myOldIndex == 0 && index == membershipRingList.Count - 1)) // I am the first node, and the new server is the last node
{
IRingRange oldRange = myRange;
try
{
myRange = RangeFactory.CreateRange(unchecked((uint)hash), unchecked((uint)myKey));
}
catch (OverflowException exc)
{
log.LogError(
(int)ErrorCode.ConsistentRingProviderBase + 5,
exc,
"OverflowException: hash as int: x{Hash}, hash as uint: x{HashUInt}, myKey as int: x{MyKey}, myKey as uint: x{MyKeyUInt}.",
hash.ToString("X8"),
((uint)hash).ToString("X8"),
myKey.ToString("X8"),
((uint)myKey).ToString("X8"));
}
myRange = RangeFactory.CreateRange(unchecked((uint)hash), unchecked((uint)myKey));
NotifyLocalRangeSubscribers(oldRange, myRange, false);
}

log.LogInformation("Added Server {SiloAddress}. Current view: {CurrentView}", silo.ToStringWithHashCode(), this.ToString());
if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug("Added Server {SiloAddress}. Current view: {CurrentView}", silo.ToStringWithHashCode(), this.ToString());
}
}
}

Expand All @@ -130,6 +122,7 @@ public override string ToString()
IRingRange range = RangeFactory.CreateRange(unchecked((uint)curr.GetConsistentHashCode()), unchecked((uint)next.GetConsistentHashCode()));
sb.Append($"{curr:H} -> {range}, ");
}

return sb.Append(']').ToString();
}
}
Expand Down Expand Up @@ -170,11 +163,14 @@ internal void RemoveServer(SiloAddress silo)
}
}

log.LogInformation(
"Removed Server {SiloAddress} hash {Hash}. Current view {CurrentView}",
silo,
silo.GetConsistentHashCode(),
this.ToString());
if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug(
"Removed Server {SiloAddress} hash {Hash}. Current view {CurrentView}",
silo,
silo.GetConsistentHashCode(),
this.ToString());
}
}
}

Expand Down Expand Up @@ -203,13 +199,18 @@ public bool UnSubscribeFromRangeChangeEvents(IRingRangeListener observer)

private void NotifyLocalRangeSubscribers(IRingRange old, IRingRange now, bool increased)
{
log.LogInformation("NotifyLocalRangeSubscribers about old {OldRange} new {NewRange} increased? {IsIncreased}", old, now, increased);
if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug("NotifyLocalRangeSubscribers about old {OldRange} new {NewRange} increased? {IsIncreased}", old, now, increased);
}

IRingRangeListener[] copy;
lock (statusListeners)
{
lastNotification = (old, now, increased);
copy = statusListeners.ToArray();
}

foreach (IRingRangeListener listener in copy)
{
try
Expand All @@ -218,14 +219,14 @@ private void NotifyLocalRangeSubscribers(IRingRange old, IRingRange now, bool in
}
catch (Exception exc)
{
log.LogError(
log.LogWarning(
(int)ErrorCode.CRP_Local_Subscriber_Exception,
exc,
"Local IRangeChangeListener {Name} has thrown an exception when was notified about RangeChangeNotification about old {OldRange} new {NewRange} increased? {IsIncrease}",
"Error notifying listener '{ListenerType}' of ring range {AdjustmentKind} from '{OldRange}' to '{NewRange}'.",
listener.GetType().FullName,
increased ? "expansion" : "contraction",
old,
now,
increased);
now);
}
}
}
Expand Down Expand Up @@ -312,5 +313,10 @@ private bool IsSiloNextInTheRing(SiloAddress siloAddr, uint hash, bool excludeMy
{
return siloAddr.GetConsistentHashCode() >= hash && (!siloAddr.Equals(MyAddress) || !excludeMySelf);
}

public void Dispose()
{
_siloStatusOracle.UnSubscribeFromSiloStatusEvents(this);
}
}
}
27 changes: 18 additions & 9 deletions src/Orleans.Runtime/ConsistentRing/VirtualBucketsRingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@ namespace Orleans.Runtime.ConsistentRing
{
/// <summary>
/// We use the 'backward/clockwise' definition to assign responsibilities on the ring.
/// E.g. in a ring of nodes {5, 10, 15} the responsible for key 7 is 10 (the node is responsible for its predecessing range).
/// E.g. in a ring of nodes {5, 10, 15} the responsible for key 7 is 10 (the node is responsible for its preceding range).
/// The backwards/clockwise approach is consistent with many overlays, e.g., Chord, Cassandra, etc.
/// Note: MembershipOracle uses 'forward/counter-clockwise' definition to assign responsibilities.
/// E.g. in a ring of nodes {5, 10, 15}, the responsible of key 7 is node 5 (the node is responsible for its sucessing range)..
/// E.g. in a ring of nodes {5, 10, 15}, the responsible of key 7 is node 5 (the node is responsible for its succeeding range).
/// </summary>
internal sealed class VirtualBucketsRingProvider :
IConsistentRingProvider, ISiloStatusListener
IConsistentRingProvider, ISiloStatusListener, IDisposable
{
private readonly List<IRingRangeListener> statusListeners = new();
private readonly SortedDictionary<uint, SiloAddress> bucketsMap = new();
private (uint Hash, SiloAddress SiloAddress)[] sortedBucketsList; // flattened sorted bucket list for fast lock-free calculation of CalculateTargetSilo
private readonly ILogger logger;
private readonly SiloAddress myAddress;
private readonly int numBucketsPerSilo;
private readonly ISiloStatusOracle _siloStatusOracle;
private bool running;
private IRingRange myRange;
private (IRingRange OldRange, IRingRange NewRange, bool Increased) lastNotification;

internal VirtualBucketsRingProvider(SiloAddress siloAddress, ILoggerFactory loggerFactory, int numVirtualBuckets)
internal VirtualBucketsRingProvider(SiloAddress siloAddress, ILoggerFactory loggerFactory, int numVirtualBuckets, ISiloStatusOracle siloStatusOracle)
{
numBucketsPerSilo = numVirtualBuckets;

_siloStatusOracle = siloStatusOracle;
if (numBucketsPerSilo <= 0)
throw new IndexOutOfRangeException($"numBucketsPerSilo is out of the range. numBucketsPerSilo = {numBucketsPerSilo}");

Expand All @@ -54,6 +55,7 @@ internal VirtualBucketsRingProvider(SiloAddress siloAddress, ILoggerFactory logg

// add myself to the list of members
AddServer(myAddress);
siloStatusOracle.SubscribeToSiloStatusEvents(this);
}

private void Stop()
Expand Down Expand Up @@ -118,14 +120,14 @@ private void NotifyLocalRangeSubscribers(IRingRange old, IRingRange now, bool in
}
catch (Exception exc)
{
logger.LogError(
logger.LogWarning(
(int)ErrorCode.CRP_Local_Subscriber_Exception,
exc,
"Local IRangeChangeListener {Name} has thrown an exception when was notified about RangeChangeNotification about old {OldRange} new {NewRange} increased? {IsIncrease}",
"Error notifying listener '{ListenerType}' of ring range {AdjustmentKind} from '{OldRange}' to '{NewRange}'.",
listener.GetType().FullName,
increased ? "expansion" : "contraction",
old,
now,
increased);
now);
}
}
}
Expand Down Expand Up @@ -322,12 +324,19 @@ private SiloAddress CalculateTargetSilo(uint hash, bool excludeThisSiloIfStoppin
s = snapshotBucketsList.Length > 1 ? snapshotBucketsList[1] : default;
}
}

if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Calculated ring partition owner silo {Owner} for key {Key}: {Key} --> {OwnerHash}", s.SiloAddress, hash, hash, s.Hash);
}

return s.SiloAddress;
}

public void Dispose()
{
_siloStatusOracle.UnSubscribeFromSiloStatusEvents(this);
}
}
}

5 changes: 3 additions & 2 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,13 @@ internal static void AddDefaultServices(ISiloBuilder builder)
var consistentRingOptions = sp.GetRequiredService<IOptions<ConsistentRingOptions>>().Value;
var siloDetails = sp.GetRequiredService<ILocalSiloDetails>();
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var siloStatusOracle = sp.GetRequiredService<ISiloStatusOracle>();
if (consistentRingOptions.UseVirtualBucketsConsistentRing)
{
return new VirtualBucketsRingProvider(siloDetails.SiloAddress, loggerFactory, consistentRingOptions.NumVirtualBucketsConsistentRing);
return new VirtualBucketsRingProvider(siloDetails.SiloAddress, loggerFactory, consistentRingOptions.NumVirtualBucketsConsistentRing, siloStatusOracle);
}
return new ConsistentRingProvider(siloDetails.SiloAddress, loggerFactory);
return new ConsistentRingProvider(siloDetails.SiloAddress, loggerFactory, siloStatusOracle);
});

services.AddSingleton<IConfigureOptions<GrainTypeOptions>, DefaultGrainTypeOptionsProvider>();
Expand Down
10 changes: 2 additions & 8 deletions src/Orleans.Runtime/Silo/Silo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public class Silo
/// </summary>
internal string Name => this.siloDetails.Name;
internal ILocalGrainDirectory LocalGrainDirectory { get { return localGrainDirectory; } }
internal IConsistentRingProvider RingProvider { get; private set; }
internal List<GrainService> GrainServices => grainServices;
internal IConsistentRingProvider RingProvider { get; }

internal SystemStatus SystemStatus { get; set; }

Expand Down Expand Up @@ -81,6 +80,7 @@ public Silo(ILocalSiloDetails siloDetails, IServiceProvider services)
{
string name = siloDetails.Name;
// Temporarily still require this. Hopefuly gone when 2.0 is released.
RingProvider = services.GetRequiredService<IConsistentRingProvider>();
this.siloDetails = siloDetails;
this.SystemStatus = SystemStatus.Creating;

Expand Down Expand Up @@ -156,9 +156,6 @@ public Silo(ILocalSiloDetails siloDetails, IServiceProvider services)
// This has to come after the message center //; note that it then gets injected back into the message center.;
localGrainDirectory = Services.GetRequiredService<LocalGrainDirectory>();

// Now the consistent ring provider
RingProvider = Services.GetRequiredService<IConsistentRingProvider>();

catalog = Services.GetRequiredService<Catalog>();

siloStatusOracle = Services.GetRequiredService<ISiloStatusOracle>();
Expand Down Expand Up @@ -229,9 +226,6 @@ private void InjectDependencies()
catalog.SiloStatusOracle = this.siloStatusOracle;
this.siloStatusOracle.SubscribeToSiloStatusEvents(localGrainDirectory);

// consistentRingProvider is not a system target per say, but it behaves like the localGrainDirectory, so it is here
this.siloStatusOracle.SubscribeToSiloStatusEvents((ISiloStatusListener)RingProvider);

this.siloStatusOracle.SubscribeToSiloStatusEvents(Services.GetRequiredService<DeploymentLoadPublisher>());

// SystemTarget for provider init calls
Expand Down
56 changes: 52 additions & 4 deletions test/NonSilo.Tests/General/RingTests_Standalone.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging.Abstractions;
using System.Net;
using Microsoft.Extensions.Logging.Abstractions;
using Orleans.Runtime;
using Orleans.Runtime.ConsistentRing;
using TestExtensions;
Expand Down Expand Up @@ -100,7 +101,7 @@ private Dictionary<SiloAddress, ConsistentRingProvider> CreateServers(int n)
for (int i = 1; i <= n; i++)
{
SiloAddress addr = SiloAddressUtils.NewLocalSiloAddress(i);
rings.Add(addr, new ConsistentRingProvider(addr, NullLoggerFactory.Instance));
rings.Add(addr, new ConsistentRingProvider(addr, NullLoggerFactory.Instance, new FakeSiloStatusOracle()));
}
return rings;
}
Expand Down Expand Up @@ -203,6 +204,53 @@ private static int CompareSiloAddressesByHash(SiloAddress x, SiloAddress y)
}
}

internal sealed class FakeSiloStatusOracle : ISiloStatusOracle
{
private readonly Dictionary<SiloAddress, SiloStatus> _content = [];

public FakeSiloStatusOracle()
{
SiloAddress = SiloAddress.New(IPAddress.Loopback, Random.Shared.Next(2000, 40_000), SiloAddress.AllocateNewGeneration());
_content[SiloAddress] = SiloStatus.Active;
}

public SiloStatus CurrentStatus => SiloStatus.Active;

public string SiloName => "TestSilo";

public SiloAddress SiloAddress { get; }

public SiloStatus GetApproximateSiloStatus(SiloAddress siloAddress)
{
if (_content.TryGetValue(siloAddress, out var status))
{
return status;
}
return SiloStatus.None;
}

public Dictionary<SiloAddress, SiloStatus> GetApproximateSiloStatuses(bool onlyActive = false)
{
return onlyActive
? new Dictionary<SiloAddress, SiloStatus>(_content.Where(kvp => kvp.Value == SiloStatus.Active))
: new Dictionary<SiloAddress, SiloStatus>(_content);
}

public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) => _content[siloAddress] = status;

public bool IsDeadSilo(SiloAddress silo) => GetApproximateSiloStatus(silo) == SiloStatus.Dead;

public bool IsFunctionalDirectory(SiloAddress siloAddress) => !GetApproximateSiloStatus(siloAddress).IsTerminating();

#region Not Implemented
public bool SubscribeToSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException();

public bool TryGetSiloName(SiloAddress siloAddress, out string siloName) => throw new NotImplementedException();

public bool UnSubscribeFromSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException();
#endregion
}

internal class RangeBreakable
{
private List<SingleRange> ranges { get; set; }
Expand All @@ -217,10 +265,10 @@ public RangeBreakable()
public bool Remove(IRingRange range)
{
bool wholerange = true;
foreach (SingleRange s in RangeFactory.GetSubRanges(range))
foreach (var s in RangeFactory.GetSubRanges(range))
{
bool found = false;
foreach (SingleRange m in ranges)
foreach (var m in ranges)
{
if (m.Begin == m.End) // treat full range as special case
{
Expand Down
Loading

0 comments on commit 7a75050

Please sign in to comment.