diff --git a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
index 8909ce9037..a593f904c9 100644
--- a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
+++ b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
@@ -17,7 +17,7 @@ namespace Orleans.Runtime.MembershipService
///
/// Responsible for ensuring that this silo monitors other silos in the cluster.
///
- internal class ClusterHealthMonitor : IHealthCheckParticipant, ILifecycleParticipant, ClusterHealthMonitor.ITestAccessor
+ internal class ClusterHealthMonitor : IHealthCheckParticipant, ILifecycleParticipant, ClusterHealthMonitor.ITestAccessor, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource shutdownCancellation = new CancellationTokenSource();
private readonly ILocalSiloDetails localSiloDetails;
@@ -284,5 +284,56 @@ bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason)
return ok;
}
+
+ public void Dispose()
+ {
+ try
+ {
+ shutdownCancellation.Cancel();
+ }
+ catch (Exception exception)
+ {
+ log.LogError(exception, "Error cancelling shutdown token.");
+ }
+
+ foreach (var monitor in monitoredSilos.Values)
+ {
+ try
+ {
+ monitor.Dispose();
+ }
+ catch (Exception exception)
+ {
+ log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
+ }
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ try
+ {
+ shutdownCancellation.Cancel();
+ }
+ catch (Exception exception)
+ {
+ log.LogError(exception, "Error cancelling shutdown token.");
+ }
+
+ var tasks = new List();
+ foreach (var monitor in monitoredSilos.Values)
+ {
+ try
+ {
+ tasks.Add(monitor.DisposeAsync().AsTask());
+ }
+ catch (Exception exception)
+ {
+ log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
+ }
+ }
+
+ await Task.WhenAll(tasks).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
+ }
}
}
diff --git a/src/Orleans.Runtime/MembershipService/IRemoteSiloProber.cs b/src/Orleans.Runtime/MembershipService/IRemoteSiloProber.cs
index 1dae452447..3ee50818b8 100644
--- a/src/Orleans.Runtime/MembershipService/IRemoteSiloProber.cs
+++ b/src/Orleans.Runtime/MembershipService/IRemoteSiloProber.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
using System.Threading.Tasks;
namespace Orleans.Runtime.MembershipService
@@ -13,10 +14,11 @@ internal interface IRemoteSiloProber
///
/// The silo to probe.
/// The probe identifier for diagnostic purposes.
+ /// The cancellation token.
///
/// A which completes when the probe returns successfully and faults when the probe fails.
///
- Task Probe(SiloAddress silo, int probeNumber);
+ Task Probe(SiloAddress silo, int probeNumber, CancellationToken cancellationToken = default);
///
/// Probes the specified indirectly, via .
@@ -25,6 +27,7 @@ internal interface IRemoteSiloProber
/// The silo which will be probed.
/// The timeout which the should apply to the probe.
/// The probe number for diagnostic purposes.
- Task ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber);
+ /// The cancellation token.
+ Task ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber, CancellationToken cancellationToken = default);
}
}
diff --git a/src/Orleans.Runtime/MembershipService/MembershipAgent.cs b/src/Orleans.Runtime/MembershipService/MembershipAgent.cs
index 277dba9e15..df61479775 100644
--- a/src/Orleans.Runtime/MembershipService/MembershipAgent.cs
+++ b/src/Orleans.Runtime/MembershipService/MembershipAgent.cs
@@ -229,31 +229,8 @@ static async Task ProbeSilo(IRemoteSiloProber siloProber, SiloAddress silo
Exception exception;
try
{
- using var cancellation = new CancellationTokenSource(timeout);
- var probeTask = siloProber.Probe(silo, 0);
- var cancellationTask = cancellation.Token.WhenCancelled();
- var completedTask = await Task.WhenAny(probeTask, cancellationTask).ConfigureAwait(false);
-
- if (ReferenceEquals(completedTask, probeTask))
- {
- cancellation.Cancel();
- if (probeTask.IsFaulted)
- {
- exception = probeTask.Exception;
- }
- else if (probeTask.Status == TaskStatus.RanToCompletion)
- {
- return true;
- }
- else
- {
- exception = null;
- }
- }
- else
- {
- exception = null;
- }
+ await siloProber.Probe(silo, 0).WaitAsync(timeout);
+ return true;
}
catch (Exception ex)
{
diff --git a/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs b/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs
index 5d2f5b78ec..293cbb795f 100644
--- a/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs
+++ b/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs
@@ -180,7 +180,7 @@ private Task ProbeInternal(SiloAddress remoteSilo, int probeNumber)
var remoteOracle = this.grainFactory.GetSystemTarget(Constants.MembershipServiceType, remoteSilo);
task = remoteOracle.Ping(probeNumber);
- // Update stats counter. Only count Pings that were successfuly sent, but not necessarily replied to.
+ // Update stats counter. Only count Pings that were successfully sent, but not necessarily replied to.
MessagingInstruments.OnPingSend(remoteSilo);
}
finally
diff --git a/src/Orleans.Runtime/MembershipService/RemoteSiloProber.cs b/src/Orleans.Runtime/MembershipService/RemoteSiloProber.cs
index 189fce1bf4..01bdecc4f5 100644
--- a/src/Orleans.Runtime/MembershipService/RemoteSiloProber.cs
+++ b/src/Orleans.Runtime/MembershipService/RemoteSiloProber.cs
@@ -1,31 +1,25 @@
+#nullable enable
using System;
+using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
-namespace Orleans.Runtime.MembershipService
+namespace Orleans.Runtime.MembershipService;
+
+///
+internal class RemoteSiloProber(IServiceProvider serviceProvider) : IRemoteSiloProber
{
///
- internal class RemoteSiloProber : IRemoteSiloProber
+ public async Task Probe(SiloAddress remoteSilo, int probeNumber, CancellationToken cancellationToken)
{
- private readonly IServiceProvider serviceProvider;
-
- public RemoteSiloProber(IServiceProvider serviceProvider)
- {
- this.serviceProvider = serviceProvider;
- }
-
- ///
- public Task Probe(SiloAddress remoteSilo, int probeNumber)
- {
- var systemTarget = this.serviceProvider.GetRequiredService();
- return systemTarget.ProbeRemoteSilo(remoteSilo, probeNumber);
- }
+ var systemTarget = serviceProvider.GetRequiredService();
+ await systemTarget.ProbeRemoteSilo(remoteSilo, probeNumber).WaitAsync(cancellationToken);
+ }
- ///
- public Task ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber)
- {
- var systemTarget = this.serviceProvider.GetRequiredService();
- return systemTarget.ProbeRemoteSiloIndirectly(intermediary, target, probeTimeout, probeNumber);
- }
+ ///
+ public async Task ProbeIndirectly(SiloAddress intermediary, SiloAddress target, TimeSpan probeTimeout, int probeNumber, CancellationToken cancellationToken)
+ {
+ var systemTarget = serviceProvider.GetRequiredService();
+ return await systemTarget.ProbeRemoteSiloIndirectly(intermediary, target, probeTimeout, probeNumber).WaitAsync(cancellationToken);
}
}
diff --git a/src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs
index f6a90e30b6..c8491fa5f9 100644
--- a/src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs
+++ b/src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs
@@ -15,7 +15,7 @@ namespace Orleans.Runtime.MembershipService
///
/// Responsible for monitoring an individual remote silo.
///
- internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable
+ internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable, IDisposable, IAsyncDisposable
{
private readonly ILogger _log;
private readonly IOptionsMonitor _clusterMembershipOptions;
@@ -23,8 +23,8 @@ internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable
private readonly ILocalSiloHealthMonitor _localSiloHealthMonitor;
private readonly IClusterMembershipService _membershipService;
private readonly ILocalSiloDetails _localSiloDetails;
- private readonly CancellationTokenSource _stoppingCancellation = new CancellationTokenSource();
- private readonly object _lockObj = new object();
+ private readonly CancellationTokenSource _stoppingCancellation = new();
+ private readonly object _lockObj = new();
private readonly IAsyncTimer _pingTimer;
private ValueStopwatch _elapsedSinceLastSuccessfulResponse;
private readonly Func _onProbeResult;
@@ -118,6 +118,16 @@ public void Start()
/// Stop the monitor.
///
public async Task StopAsync(CancellationToken cancellationToken)
+ {
+ Dispose();
+
+ if (_runTask is Task task)
+ {
+ await task.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
+ }
+ }
+
+ public void Dispose()
{
lock (_lockObj)
{
@@ -129,10 +139,14 @@ public async Task StopAsync(CancellationToken cancellationToken)
_stoppingCancellation.Cancel();
_pingTimer.Dispose();
}
+ }
+ public async ValueTask DisposeAsync()
+ {
+ Dispose();
if (_runTask is Task task)
{
- await Task.WhenAny(task, cancellationToken.WhenCancelled());
+ await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
@@ -238,20 +252,12 @@ private async Task ProbeDirectly(CancellationToken cancellation)
Exception failureException;
try
{
- var probeCancellation = cancellation.WhenCancelled();
- var probeTask = _prober.Probe(SiloAddress, id);
- var task = await Task.WhenAny(probeCancellation, probeTask);
-
- if (ReferenceEquals(task, probeCancellation) && probeTask.Status != TaskStatus.RanToCompletion)
- {
- probeTask.Ignore();
- failureException = new OperationCanceledException($"The ping attempt was cancelled after {roundTripTimer.Elapsed}. Ping #{id}");
- }
- else
- {
- await probeTask;
- failureException = null;
- }
+ await _prober.Probe(SiloAddress, id, cancellation).WaitAsync(cancellation);
+ failureException = null;
+ }
+ catch (OperationCanceledException exception)
+ {
+ failureException = new OperationCanceledException($"The ping attempt was cancelled after {roundTripTimer.Elapsed}. Ping #{id}", exception);
}
catch (Exception exception)
{
@@ -320,74 +326,63 @@ private async Task ProbeIndirectly(SiloAddress intermediary, TimeSp
try
{
using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, _stoppingCancellation.Token);
- var cancellationTask = cancellationSource.Token.WhenCancelled();
- var probeTask = _prober.ProbeIndirectly(intermediary, SiloAddress, directProbeTimeout, id);
- var task = await Task.WhenAny(cancellationTask, probeTask);
+ var indirectResult = await _prober.ProbeIndirectly(intermediary, SiloAddress, directProbeTimeout, id, cancellationSource.Token).WaitAsync(cancellationSource.Token);
+ roundTripTimer.Stop();
+ var roundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;
+
+ // Record timing regardless of the result.
+ _elapsedSinceLastSuccessfulResponse.Restart();
+ LastRoundTripTime = roundTripTime;
- if (ReferenceEquals(task, cancellationTask) && probeTask.Status != TaskStatus.RanToCompletion)
+ if (indirectResult.Succeeded)
{
- probeTask.Ignore();
- probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default);
+ _log.LogInformation(
+ "Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} succeeded after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}.",
+ id,
+ SiloAddress,
+ intermediary,
+ roundTripTimer.Elapsed,
+ indirectResult.ProbeResponseTime);
+
+ MessagingInstruments.OnPingReplyReceived(SiloAddress);
+
+ _failedProbes = 0;
+ probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult);
}
else
{
- var indirectResult = await probeTask;
- roundTripTimer.Stop();
- var roundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;
+ MessagingInstruments.OnPingReplyMissed(SiloAddress);
- // Record timing regardless of the result.
- _elapsedSinceLastSuccessfulResponse.Restart();
- LastRoundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;
-
- if (indirectResult.Succeeded)
+ if (indirectResult.IntermediaryHealthScore > 0)
{
_log.LogInformation(
- "Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} succeeded after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}.",
+ "Ignoring failure result for ping #{Id} from {Silo} since the intermediary used to probe the silo is not healthy. Intermediary health degradation score: {IntermediaryHealthScore}.",
id,
SiloAddress,
- intermediary,
- roundTripTimer.Elapsed,
- indirectResult.ProbeResponseTime);
-
- MessagingInstruments.OnPingReplyReceived(SiloAddress);
-
- _failedProbes = 0;
- probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult);
+ indirectResult.IntermediaryHealthScore);
+ probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult);
}
else
{
- MessagingInstruments.OnPingReplyMissed(SiloAddress);
+ _log.LogWarning(
+ "Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} failed after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}. Failure message: {FailureMessage}. Intermediary health score: {IntermediaryHealthScore}.",
+ id,
+ SiloAddress,
+ intermediary,
+ roundTripTimer.Elapsed,
+ indirectResult.ProbeResponseTime,
+ indirectResult.FailureMessage,
+ indirectResult.IntermediaryHealthScore);
- if (indirectResult.IntermediaryHealthScore > 0)
- {
- _log.LogInformation(
- "Ignoring failure result for ping #{Id} from {Silo} since the intermediary used to probe the silo is not healthy. Intermediary health degradation score: {IntermediaryHealthScore}",
- id,
- SiloAddress,
- indirectResult.IntermediaryHealthScore);
- probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult);
- }
- else
- {
- _log.LogWarning(
- "Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} failed after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}. Failure message: {FailureMessage}. Intermediary health score: {IntermediaryHealthScore}",
- id,
- SiloAddress,
- intermediary,
- roundTripTimer.Elapsed,
- indirectResult.ProbeResponseTime,
- indirectResult.FailureMessage,
- indirectResult.IntermediaryHealthScore);
-
- var missed = ++_failedProbes;
- probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult);
- }
+ var missed = ++_failedProbes;
+ probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult);
}
}
}
catch (Exception exception)
{
- _log.LogWarning(exception, "Indirect probe request failed");
+ MessagingInstruments.OnPingReplyMissed(SiloAddress);
+ _log.LogWarning(exception, "Indirect probe request failed.");
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default);
}
@@ -412,10 +407,10 @@ private ProbeResult(int failedProbeCount, ProbeResultStatus status, bool isDirec
}
public static ProbeResult CreateDirect(int failedProbeCount, ProbeResultStatus status)
- => new ProbeResult(failedProbeCount, status, isDirectProbe: true, 0);
+ => new(failedProbeCount, status, isDirectProbe: true, 0);
public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus status, IndirectProbeResponse indirectProbeResponse)
- => new ProbeResult(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore);
+ => new(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore);
public int FailedProbeCount { get; }
@@ -426,7 +421,7 @@ public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus
public int IntermediaryHealthDegradationScore { get; }
}
- public enum ProbeResultStatus : byte
+ public enum ProbeResultStatus
{
Unknown,
Failed,
diff --git a/test/NonSilo.Tests/Membership/SiloHealthMonitorTests.cs b/test/NonSilo.Tests/Membership/SiloHealthMonitorTests.cs
index bb4e0786d8..8e1635537c 100644
--- a/test/NonSilo.Tests/Membership/SiloHealthMonitorTests.cs
+++ b/test/NonSilo.Tests/Membership/SiloHealthMonitorTests.cs
@@ -5,9 +5,7 @@
using NonSilo.Tests.Utilities;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
-using Orleans;
using Orleans.Configuration;
-using Orleans.Runtime;
using Orleans.Runtime.MembershipService;
using TestExtensions;
using Xunit;
@@ -117,7 +115,7 @@ private async Task Shutdown()
public async Task SiloHealthMonitor_SuccessfulProbe()
{
_prober.Probe(default, default).ReturnsForAnyArgs(Task.CompletedTask);
- _prober.ProbeIndirectly(default, default, default, default).ThrowsForAnyArgs(new InvalidOperationException("No"));
+ _prober.ProbeIndirectly(default, default, default, default).ThrowsAsyncForAnyArgs(new InvalidOperationException("No"));
_monitor.Start();
@@ -136,12 +134,12 @@ public async Task SiloHealthMonitor_SuccessfulProbe()
}
[Fact]
- public async Task SiloHealthMonitor_FailedProbe()
+ public async Task SiloHealthMonitor_FailedProbe_Timeout()
{
_clusterMembershipOptions.ProbeTimeout = TimeSpan.FromSeconds(2);
- _prober.Probe(default, default).ReturnsForAnyArgs(info => Task.Delay(TimeSpan.FromSeconds(3)));
- _prober.ProbeIndirectly(default, default, default, default).ThrowsForAnyArgs(new InvalidOperationException("No"));
+ _prober.Probe(default, default, default).ReturnsForAnyArgs(info => Task.Delay(TimeSpan.FromSeconds(30)));
+ _prober.ProbeIndirectly(default, default, default, default).ThrowsAsyncForAnyArgs(new InvalidOperationException("No"));
_monitor.Start();
// Let a timer complete
@@ -154,14 +152,30 @@ public async Task SiloHealthMonitor_FailedProbe()
Assert.True(probeResult.IsDirectProbe);
Assert.Equal(0, probeResult.IntermediaryHealthDegradationScore);
+ await Shutdown();
+ }
+
+ [Fact]
+ public async Task SiloHealthMonitor_FailedProbe_Exception()
+ {
+ _clusterMembershipOptions.ProbeTimeout = TimeSpan.FromSeconds(2);
+
+ _prober.Probe(default, default).ThrowsAsyncForAnyArgs(new Exception("nope"));
+ _prober.ProbeIndirectly(default, default, default, default).ThrowsAsyncForAnyArgs(new InvalidOperationException("No"));
+ _monitor.Start();
+
+ // Let a timer complete
+ var timerCall = await _timerCalls.Reader.ReadAsync();
+ timerCall.Completion.TrySetResult(true);
+
// Throw directly, instead of timing out the probe
- _prober.Probe(default, default).ThrowsForAnyArgs(new Exception("nope"));
+ _prober.WhenForAnyArgs(s => s.Probe(default, default)).Throw(new Exception("nope"));
timerCall = await _timerCalls.Reader.ReadAsync();
timerCall.Completion.TrySetResult(true);
- probeResult = await _probeResults.Reader.ReadAsync();
+ var probeResult = await _probeResults.Reader.ReadAsync();
Assert.Equal(ProbeResultStatus.Failed, probeResult.Status);
- Assert.Equal(2, probeResult.FailedProbeCount);
+ Assert.Equal(1, probeResult.FailedProbeCount);
Assert.True(probeResult.IsDirectProbe);
Assert.Equal(0, probeResult.IntermediaryHealthDegradationScore);
@@ -174,7 +188,7 @@ public async Task SiloHealthMonitor_Indirect_FailedProbe()
_clusterMembershipOptions.ProbeTimeout = TimeSpan.FromSeconds(2);
_clusterMembershipOptions.EnableIndirectProbes = true;
- _prober.Probe(default, default).ThrowsForAnyArgs(info => new Exception("nonono!"));
+ _prober.Probe(default, default).ThrowsAsyncForAnyArgs(info => new Exception("nonono!"));
_prober.ProbeIndirectly(default, default, default, default).ReturnsForAnyArgs(new IndirectProbeResponse
{
FailureMessage = "fail",