From a264ce070734d9fe80efc26eb6c59d227f2e3a06 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Thu, 11 Jul 2024 10:11:49 -0700 Subject: [PATCH] Coordinate shutdown of AdaptiveDirectoryCacheMaintainer with LocalGrainDirectory (#9061) * Coordinate shutdown of AdaptiveDirectoryCacheMaintainer with LocalGrainDirectory * Add README for how to run distributed tests locally. --- .../Scheduler/TaskSchedulerAgent.cs | 223 ------------------ .../AdaptiveDirectoryCacheMaintainer.cs | 212 ++++++++++------- .../GrainDirectory/ILocalGrainDirectory.cs | 2 +- .../GrainDirectory/LocalGrainDirectory.cs | 10 +- src/Orleans.Runtime/Silo/Silo.cs | 19 +- .../DistributedTests.Server/ServerCommand.cs | 2 +- test/DistributedTests/README.md | 37 +++ .../Directory/MockLocalGrainDirectory.cs | 2 +- 8 files changed, 171 insertions(+), 336 deletions(-) delete mode 100644 src/Orleans.Core/Scheduler/TaskSchedulerAgent.cs create mode 100644 test/DistributedTests/README.md diff --git a/src/Orleans.Core/Scheduler/TaskSchedulerAgent.cs b/src/Orleans.Core/Scheduler/TaskSchedulerAgent.cs deleted file mode 100644 index db4445880f..0000000000 --- a/src/Orleans.Core/Scheduler/TaskSchedulerAgent.cs +++ /dev/null @@ -1,223 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; - -namespace Orleans.Runtime -{ - internal abstract class TaskSchedulerAgent : IDisposable - { - protected enum FaultBehavior - { - RestartOnFault, // Restart the agent if it faults - IgnoreFault // Allow the agent to stop if it faults, but take no other action (other than logging) - } - - private enum AgentState - { - Stopped, - Running, - StopRequested - } - - protected CancellationTokenSource Cts { get; private set; } - private readonly object lockable; - protected ILogger Log { get; } - protected FaultBehavior OnFault { get; set; } - private bool disposed; - - private AgentState State { get; set; } - private string Name { get; set; } - - protected TaskSchedulerAgent(ILoggerFactory loggerFactory) - { - Cts = new CancellationTokenSource(); - - this.Log = loggerFactory.CreateLogger(this.GetType()); - var typeName = GetType().FullName; - if (typeName.StartsWith("Orleans.", StringComparison.Ordinal)) - { - typeName = typeName[8..]; - } - - Name = typeName; - - lockable = new object(); - OnFault = FaultBehavior.IgnoreFault; - } - - public virtual void Start() - { - ThrowIfDisposed(); - lock (lockable) - { - if (State == AgentState.Running) - { - return; - } - - if (State == AgentState.Stopped) - { - Cts = new CancellationTokenSource(); - } - - State = AgentState.Running; - } - - Task.Run(() => this.StartAsync()).Ignore(); - - if (Log.IsEnabled(LogLevel.Debug)) Log.LogDebug("Started asynch agent {Name}", this.Name); - } - - private async Task StartAsync() - { - var handled = false; - try - { - await this.Run(); - } - catch (Exception exception) - { - this.HandleFault(exception); - handled = true; - } - finally - { - if (!handled) - { - if (this.OnFault == FaultBehavior.RestartOnFault && !this.Cts.IsCancellationRequested) - { - try - { - if (Log.IsEnabled(LogLevel.Debug)) Log.LogDebug("Run completed on agent {Name} - restarting", Name); - this.Start(); - } - catch (Exception exc) - { - this.Log.LogError((int)ErrorCode.Runtime_Error_100027, exc, $"Unable to restart {nameof(TaskSchedulerAgent)}"); - this.State = AgentState.Stopped; - } - } - } - } - } - - protected abstract Task Run(); - - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - public virtual void Stop() - { - try - { - ThrowIfDisposed(); - lock (lockable) - { - if (State == AgentState.Running) - { - State = AgentState.StopRequested; - Cts.Cancel(); - State = AgentState.Stopped; - } - } - } - catch (Exception exc) - { - if (Log.IsEnabled(LogLevel.Debug)) - { - // ignore. Just make sure stop does not throw. - Log.LogDebug(exc, "Ignoring error during Stop"); - } - } - - if (Log.IsEnabled(LogLevel.Debug)) - { - Log.LogDebug("Stopped agent"); - } - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (!disposing || disposed) return; - - if (Cts != null) - { - Cts.Dispose(); - Cts = null; - } - - disposed = true; - } - - public override string ToString() - { - return Name; - } - - /// - /// Handles fault - /// - /// - /// false agent has been stopped - protected bool HandleFault(Exception ex) - { - if (State == AgentState.StopRequested) - { - return false; - } - else - { - State = AgentState.Stopped; - } - - if (ex is ThreadAbortException) - { - return false; - } - - LogExecutorError(ex); - - if (OnFault == FaultBehavior.RestartOnFault && !Cts.IsCancellationRequested) - { - try - { - Start(); - } - catch (Exception exc) - { - Log.LogError((int)ErrorCode.Runtime_Error_100027, exc, $"Unable to restart {nameof(TaskSchedulerAgent)}"); - State = AgentState.Stopped; - } - } - - return State != AgentState.Stopped; - } - - private void LogExecutorError(Exception exc) - { - switch (OnFault) - { - case FaultBehavior.IgnoreFault: - Log.LogError((int)ErrorCode.Runtime_Error_100025, exc, "Asynch agent {Name} encountered unexpected exception. The executor will exit.", Name); - break; - case FaultBehavior.RestartOnFault: - Log.LogError((int)ErrorCode.Runtime_Error_100026, exc, "Asynch agent {Name} encountered unexpected exception. The Stage will be restarted.", Name); - break; - default: - throw new NotImplementedException(); - } - } - - private void ThrowIfDisposed() - { - if (disposed) ThrowDisposed(); - - static void ThrowDisposed() => throw new ObjectDisposedException("Cannot access disposed AsynchAgent"); - } - } -} \ No newline at end of file diff --git a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs b/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs index d8c9f468e3..cdd982489f 100644 --- a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs +++ b/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs @@ -1,6 +1,8 @@ +#nullable enable using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Orleans.GrainDirectory; @@ -8,117 +10,143 @@ namespace Orleans.Runtime.GrainDirectory { - internal class AdaptiveDirectoryCacheMaintainer : TaskSchedulerAgent + internal sealed class AdaptiveDirectoryCacheMaintainer { private static readonly TimeSpan SLEEP_TIME_BETWEEN_REFRESHES = Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromMinutes(1); // this should be something like minTTL/4 private readonly AdaptiveGrainDirectoryCache cache; private readonly LocalGrainDirectory router; private readonly IInternalGrainFactory grainFactory; + private readonly CancellationTokenSource _shutdownCts = new(); private long lastNumAccesses; // for stats private long lastNumHits; // for stats + private Task? _runTask; internal AdaptiveDirectoryCacheMaintainer( LocalGrainDirectory router, AdaptiveGrainDirectoryCache cache, IInternalGrainFactory grainFactory, ILoggerFactory loggerFactory) - : base(loggerFactory) { + Log = loggerFactory.CreateLogger(); this.grainFactory = grainFactory; this.router = router; this.cache = cache; lastNumAccesses = 0; lastNumHits = 0; - OnFault = FaultBehavior.RestartOnFault; } - protected override async Task Run() + private ILogger Log { get; } + + public void Start() + { + _runTask = Run(); + } + + public async Task StopAsync() { - while (router.Running) + _shutdownCts.Cancel(); + if (_runTask is { } task) { - // Run through all cache entries and do the following: - // 1. If the entry is not expired, skip it - // 2. If the entry is expired and was not accessed in the last time interval -- throw it away - // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list + await task; + } + } - // At the end of the process, fetch batch requests for entries that need to be refreshed + private async Task Run() + { + // Immediately yield back to the caller + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding | ConfigureAwaitOptions.ContinueOnCapturedContext); - // Upon receiving refreshing answers, if the entry was not changed, double its expiration timer. - // If it was changed, update the cache and reset the expiration timer. + while (!_shutdownCts.IsCancellationRequested) + { + try + { + // recheck every X seconds (Consider making it a configurable parameter) + await Task.Delay(SLEEP_TIME_BETWEEN_REFRESHES); - // this dictionary holds a map between a silo address and the list of grains that need to be refreshed - var fetchInBatchList = new Dictionary>(); + // Run through all cache entries and do the following: + // 1. If the entry is not expired, skip it + // 2. If the entry is expired and was not accessed in the last time interval -- throw it away + // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list - // get the list of cached grains + // At the end of the process, fetch batch requests for entries that need to be refreshed + // Upon receiving refreshing answers, if the entry was not changed, double its expiration timer. + // If it was changed, update the cache and reset the expiration timer. - // for debug only - int cnt1 = 0, cnt2 = 0, cnt3 = 0, cnt4 = 0; + // this dictionary holds a map between a silo address and the list of grains that need to be refreshed + var fetchInBatchList = new Dictionary>(); - // run through all cache entries - var enumerator = cache.GetStoredEntries(); - while (enumerator.MoveNext()) - { - var pair = enumerator.Current; - GrainId grain = pair.Key; - var entry = pair.Value; + // get the list of cached grains - SiloAddress owner = router.CalculateGrainDirectoryPartition(grain); - if (owner == null) // Null means there's no other silo and we're shutting down, so skip this entry - { - continue; - } + // Stats for debugging. + int ownedAndRemovedCount = 0, keptCount = 0, removedCount = 0, refreshedCount = 0; - if (entry == null) - { - // 0. If the entry was deleted in parallel, presumably due to cleanup after silo death - cache.Remove(grain); // for debug - cnt3++; - } - else if (!entry.IsExpired()) + // run through all cache entries + var enumerator = cache.GetStoredEntries(); + while (enumerator.MoveNext()) { - // 1. If the entry is not expired, skip it - cnt2++; // for debug - } - else if (entry.NumAccesses == 0) - { - // 2. If the entry is expired and was not accessed in the last time interval -- throw it away - cache.Remove(grain); // for debug - cnt3++; - } - else - { - // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list - if (!fetchInBatchList.TryGetValue(owner, out var list)) + var pair = enumerator.Current; + GrainId grain = pair.Key; + var entry = pair.Value; + + var owner = router.CalculateGrainDirectoryPartition(grain); + if (owner == null) // Null means there's no other silo and we're shutting down, so skip this entry { - fetchInBatchList[owner] = list = new List(); + continue; } - list.Add(grain); - // And reset the entry's access count for next time - entry.NumAccesses = 0; - cnt4++; // for debug - } - } - if (Log.IsEnabled(LogLevel.Trace)) - Log.LogTrace( - "Silo {SiloAddress} self-owned (and removed) {OwnedAndRemovedCount}, kept {KeptCount}, removed {RemovedCount} and tried to refresh {RefreshedCount} grains", - router.MyAddress, - cnt1, - cnt2, - cnt3, - cnt4); + if (entry == null) + { + // 0. If the entry was deleted in parallel, presumably due to cleanup after silo death + cache.Remove(grain); // for debug + removedCount++; + } + else if (!entry.IsExpired()) + { + // 1. If the entry is not expired, skip it + keptCount++; // for debug + } + else if (entry.NumAccesses == 0) + { + // 2. If the entry is expired and was not accessed in the last time interval -- throw it away + cache.Remove(grain); // for debug + removedCount++; + } + else + { + // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list + if (!fetchInBatchList.TryGetValue(owner, out var list)) + { + fetchInBatchList[owner] = list = new List(); + } + list.Add(grain); + // And reset the entry's access count for next time + entry.NumAccesses = 0; + refreshedCount++; // for debug + } + } - // send batch requests - SendBatchCacheRefreshRequests(fetchInBatchList); + if (Log.IsEnabled(LogLevel.Trace)) + Log.LogTrace( + "Silo {SiloAddress} self-owned (and removed) {OwnedAndRemovedCount}, kept {KeptCount}, removed {RemovedCount} and tried to refresh {RefreshedCount} grains", + router.MyAddress, + ownedAndRemovedCount, + keptCount, + removedCount, + refreshedCount); - ProduceStats(); + // send batch requests + SendBatchCacheRefreshRequests(fetchInBatchList); - // recheck every X seconds (Consider making it a configurable parameter) - await Task.Delay(SLEEP_TIME_BETWEEN_REFRESHES); + ProduceStats(); + } + catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) + { + Log.LogError(ex, $"Error in {nameof(AdaptiveDirectoryCacheMaintainer)}."); + } } } @@ -130,7 +158,6 @@ private void SendBatchCacheRefreshRequests(Dictionary var silo = kv.Key; - DirectoryInstruments.ValidationsCacheSent.Add(1); // Send all of the items in one large request var validator = this.grainFactory.GetSystemTarget(Constants.DirectoryCacheValidatorType, silo); @@ -151,7 +178,7 @@ private void ProcessCacheRefreshResponse( { if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} received ProcessCacheRefreshResponse. #Response entries {Count}.", router.MyAddress, refreshResponse.Count); - int cnt1 = 0, cnt2 = 0, cnt3 = 0; + int otherSiloCount = 0, updatedCount = 0, unchangedCount = 0; // pass through returned results and update the cache if needed foreach (var tuple in refreshResponse) @@ -160,25 +187,28 @@ private void ProcessCacheRefreshResponse( { // the server returned an updated entry cache.AddOrUpdate(tuple.Address, tuple.VersionTag); - cnt1++; + otherSiloCount++; } - else if (tuple.VersionTag == -1) - { - // The server indicates that it does not own the grain anymore. - // It could be that by now, the cache has been already updated and contains an entry received from another server (i.e., current owner for the grain). - // For simplicity, we do not care about this corner case and simply remove the cache entry. - cache.Remove(tuple.Address.GrainId); - cnt2++; - } - else + else if (tuple.Address is { IsComplete: false }) { - // The server returned only a (not -1) generation number, indicating that we hold the most - // updated copy of the grain's activations list. - // Validate that the generation number in the request and the response are equal - // Contract.Assert(tuple.Item2 == refreshRequest.Find(o => o.Item1 == tuple.Item1).Item2); - // refresh the entry in the cache - cache.MarkAsFresh(tuple.Address.GrainId); - cnt3++; + if (tuple.VersionTag == -1) + { + // The server indicates that it does not own the grain anymore. + // It could be that by now, the cache has been already updated and contains an entry received from another server (i.e., current owner for the grain). + // For simplicity, we do not care about this corner case and simply remove the cache entry. + cache.Remove(tuple.Address.GrainId); + updatedCount++; + } + else + { + // The server returned only a (not -1) generation number, indicating that we hold the most + // updated copy of the grain's activations list. + // Validate that the generation number in the request and the response are equal + // Contract.Assert(tuple.Item2 == refreshRequest.Find(o => o.Item1 == tuple.Item1).Item2); + // refresh the entry in the cache + cache.MarkAsFresh(tuple.Address.GrainId); + unchangedCount++; + } } } @@ -187,9 +217,9 @@ private void ProcessCacheRefreshResponse( "Silo {SiloAddress} processed refresh response from {OtherSilo} with {UpdatedCount} updated, {RemovedCount} removed, {UnchangedCount} unchanged grains", router.MyAddress, silo, - cnt1, - cnt2, - cnt3); + otherSiloCount, + updatedCount, + unchangedCount); } /// diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index 831c860921..3d76145a7f 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -16,7 +16,7 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory /// /// Stops the local portion of the directory service. /// - void Stop(); + Task StopAsync(); RemoteGrainDirectory RemoteGrainDirectory { get; } RemoteGrainDirectory CacheValidator { get; } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index c67e292554..02627eb5cc 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.GrainDirectory; +using Orleans.Runtime.Scheduler; #nullable enable namespace Orleans.Runtime.GrainDirectory @@ -99,7 +100,7 @@ public void Start() Running = true; if (maintainer != null) { - maintainer.Start(); + CacheValidator.WorkItemGroup.QueueAction(maintainer.Start); } } @@ -109,7 +110,7 @@ public void Start() // The alternative would be to allow the silo to process requests after it has handed off its partition, in which case those changes // would receive successful responses but would not be reflected in the eventual state of the directory. // It's easy to change this, if we think the trade-off is better the other way. - public void Stop() + public async Task StopAsync() { // This will cause remote write requests to be forwarded to the silo that will become the new owner. // Requests might bounce back and forth for a while as membership stabilizes, but they will either be served by the @@ -119,7 +120,10 @@ public void Stop() //mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId) Running = false; - maintainer?.Stop(); + if (maintainer is { } directoryCacheMaintainer) + { + await CacheValidator.QueueTask(directoryCacheMaintainer.StopAsync); + } DirectoryPartition.Clear(); DirectoryCache.Clear(); diff --git a/src/Orleans.Runtime/Silo/Silo.cs b/src/Orleans.Runtime/Silo/Silo.cs index 29095b4e43..1c686b51d0 100644 --- a/src/Orleans.Runtime/Silo/Silo.cs +++ b/src/Orleans.Runtime/Silo/Silo.cs @@ -269,12 +269,12 @@ private Task OnRuntimeServicesStart(CancellationToken ct) //TODO: Setup all (or as many as possible) of the class started in this call to work directly with lifecyce var stopWatch = Stopwatch.StartNew(); - StartTaskWithPerfAnalysis("Start local grain directory", LocalGrainDirectory.Start, stopWatch); - // This has to follow the above steps that start the runtime components CreateSystemTargets(); InjectDependencies(); + StartTaskWithPerfAnalysis("Start local grain directory", LocalGrainDirectory.Start, stopWatch); + return Task.CompletedTask; } @@ -519,20 +519,7 @@ private async Task OnBecomeActiveStop(CancellationToken ct) if (gracefully) { // Stop LocalGrainDirectory - var resolver = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - localGrainDirectory.CacheValidator.WorkItemGroup.QueueAction(() => - { - try - { - localGrainDirectory.Stop(); - resolver.TrySetResult(true); - } - catch (Exception exc) - { - resolver.TrySetException(exc); - } - }); - await resolver.Task; + await localGrainDirectory.CacheValidator.RunOrQueueTask(localGrainDirectory.StopAsync); try { diff --git a/test/DistributedTests/DistributedTests.Server/ServerCommand.cs b/test/DistributedTests/DistributedTests.Server/ServerCommand.cs index 2b8d049028..1f0e4c5112 100644 --- a/test/DistributedTests/DistributedTests.Server/ServerCommand.cs +++ b/test/DistributedTests/DistributedTests.Server/ServerCommand.cs @@ -19,7 +19,7 @@ public ServerCommand(ISiloConfigurator siloConfigurator) AddOption(OptionHelper.CreateOption("--siloPort", defaultValue: 11111)); AddOption(OptionHelper.CreateOption("--gatewayPort", defaultValue: 30000)); AddOption(OptionHelper.CreateOption("--secretSource", defaultValue: SecretConfiguration.SecretSource.File)); - AddOption(OptionHelper.CreateOption("--ActivationRepartitioning", defaultValue: true)); + AddOption(OptionHelper.CreateOption("--activationRepartitioning", defaultValue: false)); foreach (var opt in siloConfigurator.Options) { diff --git a/test/DistributedTests/README.md b/test/DistributedTests/README.md new file mode 100644 index 0000000000..ebfc9d7fa6 --- /dev/null +++ b/test/DistributedTests/README.md @@ -0,0 +1,37 @@ +# Distributed Tests + +## Running locally + +### Install crank and crank-agent + +```sh +dotnet tool install -g Microsoft.Crank.Controller --version 0.2.0-* +``` + +```sh +dotnet tool install -g Microsoft.Crank.Agent --version "0.2.0-*" +``` + +### Run crank agent + +Do this in a separate terminal. + +```sh +crank-agent --url http://*:5010 +``` + +### Build Orleans + +```sh +dotnet build -c Release +``` + +### Run crank scenario + +Run this from the root of the repository (next to Orleans.sln): + +```sh +crank --config .\distributed-tests.yml --scenario ping --profile local +``` + +Note: scenarios can be found in `distributed-tests.yml`. diff --git a/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs b/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs index dbb214944e..17f20f4b56 100644 --- a/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs +++ b/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs @@ -108,7 +108,7 @@ public void Start() throw new NotImplementedException(); } - public void Stop() + public Task StopAsync() { throw new NotImplementedException(); }