Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement incoming grain call filters for observers #9054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public static IClientBuilder AddActivityPropagation(this IClientBuilder builder)
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);

return builder
.AddOutgoingGrainCallFilter<ActivityPropagationOutgoingGrainCallFilter>();
.AddOutgoingGrainCallFilter<ActivityPropagationOutgoingGrainCallFilter>()
.AddIncomingGrainCallFilter<ActivityPropagationIncomingGrainCallFilter>();
}

/// <summary>
Expand Down
101 changes: 67 additions & 34 deletions src/Orleans.Core/Core/ClientBuilderGrainCallFilterExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,75 @@
namespace Orleans.Hosting
namespace Orleans.Hosting;

/// <summary>
/// Extensions for configuring grain call filters.
/// </summary>
public static class ClientBuilderGrainCallFilterExtensions
{
/// <summary>
/// Extensions for configuring grain call filters.
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter(this IClientBuilder builder, IIncomingGrainCallFilter filter)
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IIncomingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter<TImplementation>());
}

/// <summary>
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter(this IClientBuilder builder, IncomingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
public static class ClientBuilderGrainCallFilterExtensions
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, IOutgoingGrainCallFilter filter)
{
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, IOutgoingGrainCallFilter filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IOutgoingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter<TImplementation>());
}
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IOutgoingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter<TImplementation>());
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, OutgoingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, OutgoingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
}
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
Expand Down
27 changes: 26 additions & 1 deletion src/Orleans.Core/Runtime/InvokableObjectManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Serialization;
Expand All @@ -14,23 +15,34 @@ internal class InvokableObjectManager : IDisposable
{
private readonly CancellationTokenSource disposed = new CancellationTokenSource();
private readonly ConcurrentDictionary<ObserverGrainId, LocalObjectData> localObjects = new ConcurrentDictionary<ObserverGrainId, LocalObjectData>();

private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private readonly IGrainContext rootGrainContext;
private readonly IRuntimeClient runtimeClient;
private readonly ILogger logger;
private readonly DeepCopier deepCopier;
private readonly DeepCopier<Response> _responseCopier;
private readonly MessagingTrace messagingTrace;
private List<IIncomingGrainCallFilter> _grainCallFilters;

private List<IIncomingGrainCallFilter> GrainCallFilters
=> _grainCallFilters ??= new List<IIncomingGrainCallFilter>(runtimeClient.ServiceProvider.GetServices<IIncomingGrainCallFilter>());

public InvokableObjectManager(
IGrainContext rootGrainContext,
IRuntimeClient runtimeClient,
DeepCopier deepCopier,
MessagingTrace messagingTrace,
DeepCopier<Response> responseCopier,
InterfaceToImplementationMappingCache interfaceToImplementationMapping,
ILogger logger)
{
this.rootGrainContext = rootGrainContext;
this.runtimeClient = runtimeClient;
this.deepCopier = deepCopier;
this.messagingTrace = messagingTrace;
_responseCopier = responseCopier;
_interfaceToImplementationMapping = interfaceToImplementationMapping;
this.logger = logger;
}

Expand Down Expand Up @@ -246,7 +258,20 @@ private async Task LocalObjectMessagePumpAsync()
try
{
request.SetTarget(this);
var response = await request.Invoke();
var filters = _manager.GrainCallFilters;
Response response;
if (filters is { Count: > 0 } || LocalObject is IIncomingGrainCallFilter)
{
var invoker = new GrainMethodInvoker(message, this, request, filters, _manager._interfaceToImplementationMapping, _manager._responseCopier);
await invoker.Invoke();
response = invoker.Response;
}
else
{
response = await request.Invoke();
response = _manager._responseCopier.Copy(response);
}

if (message.Direction != Message.Directions.OneWay)
{
this.SendResponseAsync(message, response);
Expand Down
16 changes: 10 additions & 6 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
Expand All @@ -29,6 +30,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne
private bool disposed;

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -64,9 +66,11 @@ public OutsideRuntimeClient(
IOptions<ClientMessagingOptions> clientMessagingOptions,
MessagingTrace messagingTrace,
IServiceProvider serviceProvider,
TimeProvider timeProvider)
TimeProvider timeProvider,
InterfaceToImplementationMappingCache interfaceToImplementationMapping)
{
TimeProvider = timeProvider;
_interfaceToImplementationMapping = interfaceToImplementationMapping;
this.ServiceProvider = serviceProvider;
_localClientDetails = localClientDetails;
this.loggerFactory = loggerFactory;
Expand Down Expand Up @@ -105,14 +109,14 @@ internal void ConsumeServices()

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();

var copier = this.ServiceProvider.GetRequiredService<DeepCopier>();
this.localObjects = new InvokableObjectManager(
ServiceProvider.GetRequiredService<ClientGrainContext>(),
this,
copier,
this.messagingTrace,
this.loggerFactory.CreateLogger<ClientGrainContext>());
ServiceProvider.GetRequiredService<DeepCopier>(),
messagingTrace,
ServiceProvider.GetRequiredService<DeepCopier<Response>>(),
_interfaceToImplementationMapping,
loggerFactory.CreateLogger<ClientGrainContext>());

this.callbackTimerTask = Task.Run(MonitorCallbackExpiry);

Expand Down
10 changes: 7 additions & 3 deletions src/Orleans.Runtime/Core/HostedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Orleans.Internal;
using Orleans.Runtime.Messaging;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;

namespace Orleans.Runtime
{
Expand All @@ -24,7 +25,7 @@ internal sealed class HostedClient : IGrainContext, IGrainExtensionBinder, IDisp
private readonly Channel<Message> incomingMessages;
private readonly IGrainReferenceRuntime grainReferenceRuntime;
private readonly InvokableObjectManager invokableObjects;
private readonly IRuntimeClient runtimeClient;
private readonly InsideRuntimeClient runtimeClient;
private readonly ILogger logger;
private readonly IInternalGrainFactory grainFactory;
private readonly MessageCenter siloMessageCenter;
Expand All @@ -36,15 +37,16 @@ internal sealed class HostedClient : IGrainContext, IGrainExtensionBinder, IDisp
private Task? messagePump;

public HostedClient(
IRuntimeClient runtimeClient,
InsideRuntimeClient runtimeClient,
ILocalSiloDetails siloDetails,
ILogger<HostedClient> logger,
IGrainReferenceRuntime grainReferenceRuntime,
IInternalGrainFactory grainFactory,
MessageCenter messageCenter,
MessagingTrace messagingTrace,
DeepCopier deepCopier,
GrainReferenceActivator referenceActivator)
GrainReferenceActivator referenceActivator,
InterfaceToImplementationMappingCache interfaceToImplementationMappingCache)
{
this.incomingMessages = Channel.CreateUnbounded<Message>(new UnboundedChannelOptions
{
Expand All @@ -61,6 +63,8 @@ public HostedClient(
runtimeClient,
deepCopier,
messagingTrace,
runtimeClient.ServiceProvider.GetRequiredService<DeepCopier<Response>>(),
interfaceToImplementationMappingCache,
logger);
this.siloMessageCenter = messageCenter;
this.messagingTrace = messagingTrace;
Expand Down
11 changes: 6 additions & 5 deletions src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal sealed class InsideRuntimeClient : IRuntimeClient, ILifecycleParticipan
private readonly ILoggerFactory loggerFactory;
private readonly SiloMessagingOptions messagingOptions;
private readonly ConcurrentDictionary<(GrainId, CorrelationId), CallbackData> callbacks;
private readonly InterfaceToImplementationMappingCache interfaceToImplementationMapping;
private readonly SharedCallbackData sharedCallbackData;
private readonly SharedCallbackData systemSharedCallbackData;
private readonly PeriodicTimer callbackTimer;
Expand All @@ -38,10 +39,9 @@ internal sealed class InsideRuntimeClient : IRuntimeClient, ILifecycleParticipan
private MessageCenter messageCenter;
private List<IIncomingGrainCallFilter> grainCallFilters;
private readonly DeepCopier _deepCopier;
private readonly InterfaceToImplementationMappingCache interfaceToImplementationMapping;
private HostedClient hostedClient;

private HostedClient HostedClient => this.hostedClient ?? (this.hostedClient = this.ServiceProvider.GetRequiredService<HostedClient>());
private HostedClient HostedClient => this.hostedClient ??= this.ServiceProvider.GetRequiredService<HostedClient>();
private readonly MessageFactory messageFactory;
private IGrainReferenceRuntime grainReferenceRuntime;
private Task callbackTimerTask;
Expand All @@ -59,10 +59,11 @@ public InsideRuntimeClient(
GrainInterfaceTypeResolver interfaceIdResolver,
GrainInterfaceTypeToGrainTypeResolver interfaceToTypeResolver,
DeepCopier deepCopier,
TimeProvider timeProvider)
TimeProvider timeProvider,
InterfaceToImplementationMappingCache interfaceToImplementationMapping)
{
TimeProvider = timeProvider;
this.interfaceToImplementationMapping = new InterfaceToImplementationMappingCache();
this.interfaceToImplementationMapping = interfaceToImplementationMapping;
this._deepCopier = deepCopier;
this.ServiceProvider = serviceProvider;
this.MySilo = siloDetails.SiloAddress;
Expand Down Expand Up @@ -103,7 +104,7 @@ private GrainLocator GrainLocator
=> this.grainLocator ?? (this.grainLocator = this.ServiceProvider.GetRequiredService<GrainLocator>());

private List<IIncomingGrainCallFilter> GrainCallFilters
=> this.grainCallFilters ?? (this.grainCallFilters = new List<IIncomingGrainCallFilter>(this.ServiceProvider.GetServices<IIncomingGrainCallFilter>()));
=> this.grainCallFilters ??= new List<IIncomingGrainCallFilter>(this.ServiceProvider.GetServices<IIncomingGrainCallFilter>());

private MessageCenter MessageCenter => this.messageCenter ?? (this.messageCenter = this.ServiceProvider.GetRequiredService<MessageCenter>());

Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.AddTransient<CancellationSourcesExtension>();
services.AddKeyedTransient<IGrainExtension>(typeof(ICancellationSourcesExtension), (sp, _) => sp.GetRequiredService<CancellationSourcesExtension>());
services.TryAddSingleton<GrainFactory>(sp => sp.GetRequiredService<InsideRuntimeClient>().ConcreteGrainFactory);
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<GrainInterfaceTypeToGrainTypeResolver>();
services.TryAddFromExisting<IGrainFactory, GrainFactory>();
services.TryAddFromExisting<IInternalGrainFactory, GrainFactory>();
Expand Down
Loading
Loading