Skip to content

Commit

Permalink
Dispose cluster & silo health monitors are disposed when host is disp…
Browse files Browse the repository at this point in the history
…osed, and clean up code (#8999)
  • Loading branch information
ReubenBond committed May 15, 2024
1 parent 8657a00 commit d46b27f
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 130 deletions.
53 changes: 52 additions & 1 deletion src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Orleans.Runtime.MembershipService
/// <summary>
/// Responsible for ensuring that this silo monitors other silos in the cluster.
/// </summary>
internal class ClusterHealthMonitor : IHealthCheckParticipant, ILifecycleParticipant<ISiloLifecycle>, ClusterHealthMonitor.ITestAccessor
internal class ClusterHealthMonitor : IHealthCheckParticipant, ILifecycleParticipant<ISiloLifecycle>, ClusterHealthMonitor.ITestAccessor, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource shutdownCancellation = new CancellationTokenSource();
private readonly ILocalSiloDetails localSiloDetails;
Expand Down Expand Up @@ -284,5 +284,56 @@ bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason)

return ok;
}

public void Dispose()
{
try
{
shutdownCancellation.Cancel();
}
catch (Exception exception)
{
log.LogError(exception, "Error cancelling shutdown token.");
}

foreach (var monitor in monitoredSilos.Values)
{
try
{
monitor.Dispose();
}
catch (Exception exception)
{
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
}
}
}

public async ValueTask DisposeAsync()
{
try
{
shutdownCancellation.Cancel();
}
catch (Exception exception)
{
log.LogError(exception, "Error cancelling shutdown token.");
}

var tasks = new List<Task>();
foreach (var monitor in monitoredSilos.Values)
{
try
{
tasks.Add(monitor.DisposeAsync().AsTask());
}
catch (Exception exception)
{
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
}
}

await Task.WhenAll(tasks).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
}
7 changes: 5 additions & 2 deletions src/Orleans.Runtime/MembershipService/IRemoteSiloProber.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Runtime.MembershipService
Expand All @@ -13,10 +14,11 @@ internal interface IRemoteSiloProber
/// </summary>
/// <param name="silo">The silo to probe.</param>
/// <param name="probeNumber">The probe identifier for diagnostic purposes.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>
/// A <see cref="Task"/> which completes when the probe returns successfully and faults when the probe fails.
/// </returns>
Task Probe(SiloAddress silo, int probeNumber);
Task Probe(SiloAddress silo, int probeNumber, CancellationToken cancellationToken = default);

/// <summary>
/// Probes the specified <paramref name="target"/> indirectly, via <paramref name="intermediary"/>.
Expand All @@ -25,6 +27,7 @@ internal interface IRemoteSiloProber
/// <param name="target">The silo which will be probed.</param>
/// <param name="probeTimeout">The timeout which the <paramref name="intermediary" /> should apply to the probe.</param>
/// <param name="probeNumber">The probe number for diagnostic purposes.</param>
Task<IndirectProbeResponse> ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber);
/// <param name="cancellationToken">The cancellation token.</param>
Task<IndirectProbeResponse> ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber, CancellationToken cancellationToken = default);
}
}
27 changes: 2 additions & 25 deletions src/Orleans.Runtime/MembershipService/MembershipAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,31 +229,8 @@ static async Task<bool> ProbeSilo(IRemoteSiloProber siloProber, SiloAddress silo
Exception exception;
try
{
using var cancellation = new CancellationTokenSource(timeout);
var probeTask = siloProber.Probe(silo, 0);
var cancellationTask = cancellation.Token.WhenCancelled();
var completedTask = await Task.WhenAny(probeTask, cancellationTask).ConfigureAwait(false);

if (ReferenceEquals(completedTask, probeTask))
{
cancellation.Cancel();
if (probeTask.IsFaulted)
{
exception = probeTask.Exception;
}
else if (probeTask.Status == TaskStatus.RanToCompletion)
{
return true;
}
else
{
exception = null;
}
}
else
{
exception = null;
}
await siloProber.Probe(silo, 0).WaitAsync(timeout);
return true;
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private Task ProbeInternal(SiloAddress remoteSilo, int probeNumber)
var remoteOracle = this.grainFactory.GetSystemTarget<IMembershipService>(Constants.MembershipServiceType, remoteSilo);
task = remoteOracle.Ping(probeNumber);

// Update stats counter. Only count Pings that were successfuly sent, but not necessarily replied to.
// Update stats counter. Only count Pings that were successfully sent, but not necessarily replied to.
MessagingInstruments.OnPingSend(remoteSilo);
}
finally
Expand Down
36 changes: 15 additions & 21 deletions src/Orleans.Runtime/MembershipService/RemoteSiloProber.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Runtime.MembershipService
namespace Orleans.Runtime.MembershipService;

/// <inheritdoc />
internal class RemoteSiloProber(IServiceProvider serviceProvider) : IRemoteSiloProber
{
/// <inheritdoc />
internal class RemoteSiloProber : IRemoteSiloProber
public async Task Probe(SiloAddress remoteSilo, int probeNumber, CancellationToken cancellationToken)
{
private readonly IServiceProvider serviceProvider;

public RemoteSiloProber(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}

/// <inheritdoc />
public Task Probe(SiloAddress remoteSilo, int probeNumber)
{
var systemTarget = this.serviceProvider.GetRequiredService<MembershipSystemTarget>();
return systemTarget.ProbeRemoteSilo(remoteSilo, probeNumber);
}
var systemTarget = serviceProvider.GetRequiredService<MembershipSystemTarget>();
await systemTarget.ProbeRemoteSilo(remoteSilo, probeNumber).WaitAsync(cancellationToken);
}

/// <inheritdoc />
public Task<IndirectProbeResponse> ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber)
{
var systemTarget = this.serviceProvider.GetRequiredService<MembershipSystemTarget>();
return systemTarget.ProbeRemoteSiloIndirectly(intermediary, target, probeTimeout, probeNumber);
}
/// <inheritdoc />
public async Task<IndirectProbeResponse> ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber, CancellationToken cancellationToken)
{
var systemTarget = serviceProvider.GetRequiredService<MembershipSystemTarget>();
return await systemTarget.ProbeRemoteSiloIndirectly(intermediary, target, probeTimeout, probeNumber).WaitAsync(cancellationToken);
}
}
135 changes: 65 additions & 70 deletions src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ namespace Orleans.Runtime.MembershipService
/// <summary>
/// Responsible for monitoring an individual remote silo.
/// </summary>
internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable
internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable, IDisposable, IAsyncDisposable
{
private readonly ILogger _log;
private readonly IOptionsMonitor<ClusterMembershipOptions> _clusterMembershipOptions;
private readonly IRemoteSiloProber _prober;
private readonly ILocalSiloHealthMonitor _localSiloHealthMonitor;
private readonly IClusterMembershipService _membershipService;
private readonly ILocalSiloDetails _localSiloDetails;
private readonly CancellationTokenSource _stoppingCancellation = new CancellationTokenSource();
private readonly object _lockObj = new object();
private readonly CancellationTokenSource _stoppingCancellation = new();
private readonly object _lockObj = new();
private readonly IAsyncTimer _pingTimer;
private ValueStopwatch _elapsedSinceLastSuccessfulResponse;
private readonly Func<SiloHealthMonitor, ProbeResult, Task> _onProbeResult;
Expand Down Expand Up @@ -118,6 +118,16 @@ public void Start()
/// Stop the monitor.
/// </summary>
public async Task StopAsync(CancellationToken cancellationToken)
{
Dispose();

if (_runTask is Task task)
{
await task.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}

public void Dispose()
{
lock (_lockObj)
{
Expand All @@ -129,10 +139,14 @@ public async Task StopAsync(CancellationToken cancellationToken)
_stoppingCancellation.Cancel();
_pingTimer.Dispose();
}
}

public async ValueTask DisposeAsync()
{
Dispose();
if (_runTask is Task task)
{
await Task.WhenAny(task, cancellationToken.WhenCancelled());
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}

Expand Down Expand Up @@ -238,20 +252,12 @@ private async Task<ProbeResult> ProbeDirectly(CancellationToken cancellation)
Exception failureException;
try
{
var probeCancellation = cancellation.WhenCancelled();
var probeTask = _prober.Probe(SiloAddress, id);
var task = await Task.WhenAny(probeCancellation, probeTask);

if (ReferenceEquals(task, probeCancellation) && probeTask.Status != TaskStatus.RanToCompletion)
{
probeTask.Ignore();
failureException = new OperationCanceledException($"The ping attempt was cancelled after {roundTripTimer.Elapsed}. Ping #{id}");
}
else
{
await probeTask;
failureException = null;
}
await _prober.Probe(SiloAddress, id, cancellation).WaitAsync(cancellation);
failureException = null;
}
catch (OperationCanceledException exception)
{
failureException = new OperationCanceledException($"The ping attempt was cancelled after {roundTripTimer.Elapsed}. Ping #{id}", exception);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -320,74 +326,63 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
try
{
using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, _stoppingCancellation.Token);
var cancellationTask = cancellationSource.Token.WhenCancelled();
var probeTask = _prober.ProbeIndirectly(intermediary, SiloAddress, directProbeTimeout, id);
var task = await Task.WhenAny(cancellationTask, probeTask);
var indirectResult = await _prober.ProbeIndirectly(intermediary, SiloAddress, directProbeTimeout, id, cancellationSource.Token).WaitAsync(cancellationSource.Token);
roundTripTimer.Stop();
var roundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;

// Record timing regardless of the result.
_elapsedSinceLastSuccessfulResponse.Restart();
LastRoundTripTime = roundTripTime;

if (ReferenceEquals(task, cancellationTask) && probeTask.Status != TaskStatus.RanToCompletion)
if (indirectResult.Succeeded)
{
probeTask.Ignore();
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default);
_log.LogInformation(
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} succeeded after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}.",
id,
SiloAddress,
intermediary,
roundTripTimer.Elapsed,
indirectResult.ProbeResponseTime);

MessagingInstruments.OnPingReplyReceived(SiloAddress);

_failedProbes = 0;
probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult);
}
else
{
var indirectResult = await probeTask;
roundTripTimer.Stop();
var roundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;
MessagingInstruments.OnPingReplyMissed(SiloAddress);

// Record timing regardless of the result.
_elapsedSinceLastSuccessfulResponse.Restart();
LastRoundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;

if (indirectResult.Succeeded)
if (indirectResult.IntermediaryHealthScore > 0)
{
_log.LogInformation(
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} succeeded after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}.",
"Ignoring failure result for ping #{Id} from {Silo} since the intermediary used to probe the silo is not healthy. Intermediary health degradation score: {IntermediaryHealthScore}.",
id,
SiloAddress,
intermediary,
roundTripTimer.Elapsed,
indirectResult.ProbeResponseTime);

MessagingInstruments.OnPingReplyReceived(SiloAddress);

_failedProbes = 0;
probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult);
indirectResult.IntermediaryHealthScore);
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult);
}
else
{
MessagingInstruments.OnPingReplyMissed(SiloAddress);
_log.LogWarning(
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} failed after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}. Failure message: {FailureMessage}. Intermediary health score: {IntermediaryHealthScore}.",
id,
SiloAddress,
intermediary,
roundTripTimer.Elapsed,
indirectResult.ProbeResponseTime,
indirectResult.FailureMessage,
indirectResult.IntermediaryHealthScore);

if (indirectResult.IntermediaryHealthScore > 0)
{
_log.LogInformation(
"Ignoring failure result for ping #{Id} from {Silo} since the intermediary used to probe the silo is not healthy. Intermediary health degradation score: {IntermediaryHealthScore}",
id,
SiloAddress,
indirectResult.IntermediaryHealthScore);
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult);
}
else
{
_log.LogWarning(
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} failed after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}. Failure message: {FailureMessage}. Intermediary health score: {IntermediaryHealthScore}",
id,
SiloAddress,
intermediary,
roundTripTimer.Elapsed,
indirectResult.ProbeResponseTime,
indirectResult.FailureMessage,
indirectResult.IntermediaryHealthScore);

var missed = ++_failedProbes;
probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult);
}
var missed = ++_failedProbes;
probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult);
}
}
}
catch (Exception exception)
{
_log.LogWarning(exception, "Indirect probe request failed");
MessagingInstruments.OnPingReplyMissed(SiloAddress);
_log.LogWarning(exception, "Indirect probe request failed.");
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default);
}

Expand All @@ -412,10 +407,10 @@ private ProbeResult(int failedProbeCount, ProbeResultStatus status, bool isDirec
}

public static ProbeResult CreateDirect(int failedProbeCount, ProbeResultStatus status)
=> new ProbeResult(failedProbeCount, status, isDirectProbe: true, 0);
=> new(failedProbeCount, status, isDirectProbe: true, 0);

public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus status, IndirectProbeResponse indirectProbeResponse)
=> new ProbeResult(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore);
=> new(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore);

public int FailedProbeCount { get; }

Expand All @@ -426,7 +421,7 @@ public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus
public int IntermediaryHealthDegradationScore { get; }
}

public enum ProbeResultStatus : byte
public enum ProbeResultStatus
{
Unknown,
Failed,
Expand Down
Loading

0 comments on commit d46b27f

Please sign in to comment.