diff --git a/Orleans.sln b/Orleans.sln index 4df07f0c88..b9967bcc95 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -77,8 +77,6 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "TestFSharp", "test\Grains\T EndProject Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "TestFSharpInterfaces", "test\Misc\TestFSharpInterfaces\TestFSharpInterfaces.fsproj", "{A4F61392-36A3-457C-80D0-9CDC48F5922F}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.GCP", "src\Orleans.Streaming.GCP\Orleans.Streaming.GCP.csproj", "{6E5860C5-44E7-415C-80D6-3ECF15A80796}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestInternalGrainInterfaces", "test\Grains\TestInternalGrainInterfaces\TestInternalGrainInterfaces.csproj", "{EBD697E3-91BE-4844-B3F0-6997300A8C12}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Core.Abstractions", "src\Orleans.Core.Abstractions\Orleans.Core.Abstractions.csproj", "{73514686-D25D-478B-9943-A86F6B0F3A37}" @@ -221,7 +219,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Proto EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.AdoNet", "src\AdoNet\Orleans.Streaming.AdoNet\Orleans.Streaming.AdoNet.csproj", "{2B994F33-16CF-4679-936A-5AEABC529D2C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -361,10 +359,6 @@ Global {A4F61392-36A3-457C-80D0-9CDC48F5922F}.Debug|Any CPU.Build.0 = Debug|Any CPU {A4F61392-36A3-457C-80D0-9CDC48F5922F}.Release|Any CPU.ActiveCfg = Release|Any CPU {A4F61392-36A3-457C-80D0-9CDC48F5922F}.Release|Any CPU.Build.0 = Release|Any CPU - {6E5860C5-44E7-415C-80D6-3ECF15A80796}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {6E5860C5-44E7-415C-80D6-3ECF15A80796}.Debug|Any CPU.Build.0 = Debug|Any CPU - {6E5860C5-44E7-415C-80D6-3ECF15A80796}.Release|Any CPU.ActiveCfg = Release|Any CPU - {6E5860C5-44E7-415C-80D6-3ECF15A80796}.Release|Any CPU.Build.0 = Release|Any CPU {EBD697E3-91BE-4844-B3F0-6997300A8C12}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {EBD697E3-91BE-4844-B3F0-6997300A8C12}.Debug|Any CPU.Build.0 = Debug|Any CPU {EBD697E3-91BE-4844-B3F0-6997300A8C12}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -633,7 +627,6 @@ Global {6BA81672-10EA-4DA7-A620-3D60619FD39E} = {082D25DB-70CA-48F4-93E0-EC3455F494B8} {8A652779-85EF-48E2-A639-1EED3CE2C39C} = {2A128E88-B281-4BFB-ADEB-E515437F2385} {A4F61392-36A3-457C-80D0-9CDC48F5922F} = {70BCC54E-1618-4742-A079-07588065E361} - {6E5860C5-44E7-415C-80D6-3ECF15A80796} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A} {EBD697E3-91BE-4844-B3F0-6997300A8C12} = {2A128E88-B281-4BFB-ADEB-E515437F2385} {73514686-D25D-478B-9943-A86F6B0F3A37} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23} {072B1B88-FE98-4354-86FA-AB6EF80EB9C4} = {3189037B-208D-40A1-A561-169D77D9BB5A} diff --git a/src/Orleans.Streaming.GCP/GoogleErrorCodes.cs b/src/Orleans.Streaming.GCP/GoogleErrorCodes.cs deleted file mode 100644 index ca736803db..0000000000 --- a/src/Orleans.Streaming.GCP/GoogleErrorCodes.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using Microsoft.Extensions.Logging; -namespace Orleans.Providers.GCP -{ - internal enum GoogleErrorCode - { - GoogleErrorCodeBase = 1 << 24, - Initializing = GoogleErrorCodeBase + 1, - DeleteTopic = GoogleErrorCodeBase + 2, - PublishMessage = GoogleErrorCodeBase + 3, - GetMessages = GoogleErrorCodeBase + 4, - DeleteMessage = GoogleErrorCodeBase + 5, - AcknowledgeMessage = GoogleErrorCodeBase + 6 - } - - internal static class LoggerExtensions - { - internal static void Debug(this ILogger logger, GoogleErrorCode errorCode, string format, params object[] args) - { - logger.LogDebug((int)errorCode, format, args); - } - - internal static void Trace(this ILogger logger, GoogleErrorCode errorCode, string format, params object[] args) - { - logger.LogTrace((int)errorCode, format, args); - } - - internal static void Info(this ILogger logger, GoogleErrorCode errorCode, string format, params object[] args) - { - logger.LogInformation((int)errorCode, format, args); - } - - internal static void Warn(this ILogger logger, GoogleErrorCode errorCode, string format, params object[] args) - { - logger.LogWarning((int)errorCode, format, args); - } - - internal static void Warn(this ILogger logger, GoogleErrorCode errorCode, string message, Exception exception) - { - logger.LogWarning((int)errorCode, exception, message); - } - - internal static void Error(this ILogger logger, GoogleErrorCode errorCode, string message, Exception exception = null) - { - logger.LogError((int)errorCode, exception, message); - } - } -} diff --git a/src/Orleans.Streaming.GCP/Hosting/ClientBuilderExtensions.cs b/src/Orleans.Streaming.GCP/Hosting/ClientBuilderExtensions.cs deleted file mode 100644 index a13f019488..0000000000 --- a/src/Orleans.Streaming.GCP/Hosting/ClientBuilderExtensions.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System; -using Orleans.Configuration; -using Orleans.Providers.GCP.Streams.PubSub; - -namespace Orleans.Hosting -{ - public static class ClientBuilderExtensions - { - /// - /// Configure cluster client to use PubSub persistent streams. - /// - public static IClientBuilder AddPubSubStreams( - this IClientBuilder builder, - string name, Action configurePubSub) - where TDataAdapter : IPubSubDataAdapter - { - builder.AddPubSubStreams(name, b=>b.ConfigurePubSub(ob => ob.Configure(configurePubSub))); - return builder; - } - - /// - /// Configure cluster client to use PubSub persistent streams. - /// - public static IClientBuilder AddPubSubStreams( - this IClientBuilder builder, - string name, Action> configure) - where TDataAdapter : IPubSubDataAdapter - { - var configurator = new ClusterClientPubSubStreamConfigurator(name, builder); - configure?.Invoke(configurator); - return builder; - } - } -} diff --git a/src/Orleans.Streaming.GCP/Hosting/SiloBuilderExtensions.cs b/src/Orleans.Streaming.GCP/Hosting/SiloBuilderExtensions.cs deleted file mode 100644 index 6cac331588..0000000000 --- a/src/Orleans.Streaming.GCP/Hosting/SiloBuilderExtensions.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System; -using Orleans.Configuration; -using Orleans.Providers.GCP.Streams.PubSub; - -namespace Orleans.Hosting -{ - public static class SiloBuilderExtensions - { - /// - /// Configure silo to use PubSub persistent streams. - /// - public static ISiloBuilder AddPubSubStreams( - this ISiloBuilder builder, - string name, Action configurePubSub) - where TDataAdapter : IPubSubDataAdapter - { - builder.AddPubSubStreams(name, b=> - b.ConfigurePubSub(ob => ob.Configure(configurePubSub))); - return builder; - } - - /// - /// Configure silo to use PubSub persistent streams. - /// - public static ISiloBuilder AddPubSubStreams( - this ISiloBuilder builder, - string name, Action> configure) - where TDataAdapter : IPubSubDataAdapter - { - var configurator = new SiloPubSubStreamConfigurator(name, - configureServicesDelegate => builder.ConfigureServices(configureServicesDelegate)); - configure?.Invoke(configurator); - return builder; - } - } -} \ No newline at end of file diff --git a/src/Orleans.Streaming.GCP/Orleans.Streaming.GCP.csproj b/src/Orleans.Streaming.GCP/Orleans.Streaming.GCP.csproj deleted file mode 100644 index a3c65f94e9..0000000000 --- a/src/Orleans.Streaming.GCP/Orleans.Streaming.GCP.csproj +++ /dev/null @@ -1,22 +0,0 @@ - - - Microsoft.Orleans.Streaming.GCP - Microsoft Orleans Google Cloud Platform Streaming Provider - Microsoft Orleans stream provider for Google Cloud Platform PubSub. - $(PackageTags) - $(DefaultTargetFrameworks) - Orleans.Streaming.GCP - Orleans.Providers.GCP - - - - - false - - - - - - - - diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/IPubSubDataAdapter.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/IPubSubDataAdapter.cs deleted file mode 100644 index a1011ebbf5..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/IPubSubDataAdapter.cs +++ /dev/null @@ -1,64 +0,0 @@ -using Google.Cloud.PubSub.V1; -using Google.Protobuf; -using Microsoft.Extensions.DependencyInjection; -using Orleans.Providers.Streams.Common; -using Orleans.Runtime; -using Orleans.Serialization; -using Orleans.Streams; -using System.Collections.Generic; -using System.Linq; - -namespace Orleans.Providers.GCP.Streams.PubSub -{ - /// - /// Converts event data to and from cloud queue message - /// - public interface IPubSubDataAdapter - { - /// - /// Creates a from stream event data. - /// - PubsubMessage ToPubSubMessage(StreamId streamId, IEnumerable events, Dictionary requestContext); - - /// - /// Creates a batch container from a message - /// - IBatchContainer FromPullResponseMessage(PubsubMessage msg, long sequenceId); - } - - [SerializationCallbacks(typeof(OnDeserializedCallbacks))] - public class PubSubDataAdapter : IPubSubDataAdapter, IOnDeserialized - { - private Serializer _serializer; - - /// - /// Initializes a new instance of the class. - /// - public PubSubDataAdapter(Serializer serializer) - { - _serializer = serializer; - } - - /// - public IBatchContainer FromPullResponseMessage(PubsubMessage msg, long sequenceId) - { - var batchContainer = _serializer.Deserialize(msg.Data.ToByteArray()); - batchContainer.RealSequenceToken = new EventSequenceTokenV2(sequenceId); - return batchContainer; - } - - /// - public PubsubMessage ToPubSubMessage(StreamId streamId, IEnumerable events, Dictionary requestContext) - { - var batchMessage = new PubSubBatchContainer(streamId, events.Cast().ToList(), requestContext); - var rawBytes = _serializer.SerializeToArray(batchMessage); - - return new PubsubMessage { Data = ByteString.CopyFrom(rawBytes) }; - } - - void IOnDeserialized.OnDeserialized(DeserializationContext context) - { - _serializer = context.ServiceProvider.GetRequiredService>(); - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapter.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapter.cs deleted file mode 100644 index f4c517a509..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapter.cs +++ /dev/null @@ -1,77 +0,0 @@ -using Orleans.Runtime; -using Orleans.Streams; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; - -namespace Orleans.Providers.GCP.Streams.PubSub -{ - internal class PubSubAdapter : IQueueAdapter - where TDataAdapter : IPubSubDataAdapter - { - protected readonly string ServiceId; - protected readonly string ProjectId; - protected readonly string TopicId; - protected readonly TimeSpan? Deadline; - private readonly HashRingBasedStreamQueueMapper _streamQueueMapper; - protected readonly ConcurrentDictionary Subscriptions = new ConcurrentDictionary(); - protected readonly IPubSubDataAdapter _dataAdapter; - private readonly ILogger _logger; - private readonly ILoggerFactory loggerFactory; - private readonly string _customEndpoint; - public string Name { get; } - public bool IsRewindable => false; - public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite; - - public PubSubAdapter( - TDataAdapter dataAdapter, - ILoggerFactory loggerFactory, - HashRingBasedStreamQueueMapper streamQueueMapper, - string projectId, - string topicId, - string serviceId, - string providerName, - TimeSpan? deadline = null, - string customEndpoint = "") - { - if (string.IsNullOrEmpty(projectId)) throw new ArgumentNullException(nameof(projectId)); - if (string.IsNullOrEmpty(topicId)) throw new ArgumentNullException(nameof(topicId)); - if (string.IsNullOrEmpty(serviceId)) throw new ArgumentNullException(nameof(serviceId)); - - _logger = loggerFactory.CreateLogger($"{this.GetType().FullName}.{providerName}"); - this.loggerFactory = loggerFactory; - ProjectId = projectId; - TopicId = topicId; - ServiceId = serviceId; - Name = providerName; - Deadline = deadline; - _streamQueueMapper = streamQueueMapper; - _dataAdapter = dataAdapter; - _customEndpoint = customEndpoint; - } - - public IQueueAdapterReceiver CreateReceiver(QueueId queueId) - { - return PubSubAdapterReceiver.Create(this.loggerFactory, queueId, ProjectId, TopicId, ServiceId, _dataAdapter, Deadline, _customEndpoint); - } - - public async Task QueueMessageBatchAsync(StreamId streamId, IEnumerable events, StreamSequenceToken token, Dictionary requestContext) - { - if (token != null) throw new ArgumentException("Google PubSub stream provider currently does not support non-null StreamSequenceToken.", nameof(token)); - var queueId = _streamQueueMapper.GetQueueForStream(streamId); - - PubSubDataManager pubSub; - if (!Subscriptions.TryGetValue(queueId, out pubSub)) - { - var tmpPubSub = new PubSubDataManager(this.loggerFactory, ProjectId, TopicId, queueId.ToString(), ServiceId, Deadline); - await tmpPubSub.Initialize(); - pubSub = Subscriptions.GetOrAdd(queueId, tmpPubSub); - } - - var msg = _dataAdapter.ToPubSubMessage(streamId, events, requestContext); - await pubSub.PublishMessages(new[] { msg }); - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterFactory.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterFactory.cs deleted file mode 100644 index 299ebf34f5..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterFactory.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Logging; -using Orleans.Providers.Streams.Common; -using Orleans.Streams; -using Orleans.Configuration; -using Orleans.Configuration.Overrides; - -namespace Orleans.Providers.GCP.Streams.PubSub -{ - public class PubSubAdapterFactory : IQueueAdapterFactory - where TDataAdapter : IPubSubDataAdapter - { - private readonly string _providerName; - private readonly PubSubOptions options; - private readonly ClusterOptions clusterOptions; - private readonly ILoggerFactory loggerFactory; - private readonly Func _adaptorFactory; - private readonly HashRingBasedStreamQueueMapper _streamQueueMapper; - private readonly IQueueAdapterCache _adapterCache; - - /// - /// Application level failure handler override. - /// - protected Func> StreamFailureHandlerFactory { private get; set; } - - public PubSubAdapterFactory( - string name, - PubSubOptions options, - HashRingStreamQueueMapperOptions queueMapperOptions, - SimpleQueueCacheOptions cacheOptions, - IServiceProvider serviceProvider, - IOptions clusterOptions, - ILoggerFactory loggerFactory) - { - this._providerName = name; - this.options = options; - this.clusterOptions = clusterOptions.Value; - this.loggerFactory = loggerFactory; - this._adaptorFactory = () => ActivatorUtilities.GetServiceOrCreateInstance(serviceProvider); - this._streamQueueMapper = new HashRingBasedStreamQueueMapper(queueMapperOptions, this._providerName); - this._adapterCache = new SimpleQueueAdapterCache(cacheOptions, this._providerName, loggerFactory); - } - - public virtual void Init() - { - if (StreamFailureHandlerFactory == null) - { - StreamFailureHandlerFactory = - qid => Task.FromResult(new NoOpStreamDeliveryFailureHandler()); - } - - } - - public virtual Task CreateAdapter() - { - var adapter = new PubSubAdapter(_adaptorFactory(), this.loggerFactory, _streamQueueMapper, - this.options.ProjectId, this.options.TopicId, this.clusterOptions.ServiceId, this._providerName, this.options.Deadline, this.options.CustomEndpoint); - return Task.FromResult(adapter); - } - - public Task GetDeliveryFailureHandler(QueueId queueId) => StreamFailureHandlerFactory(queueId); - - public IQueueAdapterCache GetQueueAdapterCache() => _adapterCache; - - public IStreamQueueMapper GetStreamQueueMapper() => _streamQueueMapper; - - public static PubSubAdapterFactory Create(IServiceProvider services, string name) - { - var pubsubOptions = services.GetOptionsByName(name); - var cacheOptions = services.GetOptionsByName(name); - var queueMapperOptions = services.GetOptionsByName(name); - IOptions clusterOptions = services.GetProviderClusterOptions(name); - var factory = ActivatorUtilities.CreateInstance>(services, name, pubsubOptions, queueMapperOptions, cacheOptions, clusterOptions); - factory.Init(); - return factory; - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterReceiver.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterReceiver.cs deleted file mode 100644 index d336863f52..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubAdapterReceiver.cs +++ /dev/null @@ -1,153 +0,0 @@ -using Google.Cloud.PubSub.V1; -using Orleans.Streams; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; - -namespace Orleans.Providers.GCP.Streams.PubSub -{ - public class PubSubAdapterReceiver : IQueueAdapterReceiver - { - private PubSubDataManager _pubSub; - private long _lastReadMessage; - private Task _outstandingTask; - private readonly ILogger _logger; - private readonly IPubSubDataAdapter _dataAdapter; - private readonly List _pending; - - public QueueId Id { get; } - - public static IQueueAdapterReceiver Create(ILoggerFactory loggerFactory, QueueId queueId, string projectId, string topicId, - string serviceId, IPubSubDataAdapter dataAdapter, TimeSpan? deadline = null, string customEndpoint = "") - { - if (queueId.IsDefault) throw new ArgumentNullException(nameof(queueId)); - if (dataAdapter == null) throw new ArgumentNullException(nameof(dataAdapter)); - - var pubSub = new PubSubDataManager(loggerFactory, projectId, topicId, queueId.ToString(), serviceId, deadline, customEndpoint); - return new PubSubAdapterReceiver(loggerFactory, queueId, topicId, pubSub, dataAdapter); - } - - private PubSubAdapterReceiver(ILoggerFactory loggerFactory, QueueId queueId, string topicId, PubSubDataManager pubSub, IPubSubDataAdapter dataAdapter) - { - if (queueId.IsDefault) throw new ArgumentNullException(nameof(queueId)); - Id = queueId; - if (pubSub == null) throw new ArgumentNullException(nameof(pubSub)); - _pubSub = pubSub; - - if (dataAdapter == null) throw new ArgumentNullException(nameof(dataAdapter)); - _dataAdapter = dataAdapter; - - _logger = loggerFactory.CreateLogger($"{this.GetType().FullName}.{topicId}.{queueId}"); - _pending = new List(); - } - - public Task Initialize(TimeSpan timeout) - { - if (_pubSub != null) return _pubSub.Initialize(); - - return Task.CompletedTask; - } - - public async Task Shutdown(TimeSpan timeout) - { - try - { - // await the last pending operation, so after we shutdown and stop this receiver we don't get async operation completions from pending operations. - if (_outstandingTask != null) - await _outstandingTask; - } - finally - { - // remember that we shut down so we never try to read from the queue again. - _pubSub = null; - } - } - - public async Task> GetQueueMessagesAsync(int maxCount) - { - try - { - var pubSubRef = _pubSub; // store direct ref, in case we are somehow asked to shutdown while we are receiving. - if (pubSubRef == null) return new List(); - - var task = pubSubRef.GetMessages(maxCount); - _outstandingTask = task; - IEnumerable messages = await task; - - List pubSubMessages = new List(); - foreach (var message in messages) - { - IBatchContainer container = _dataAdapter.FromPullResponseMessage(message.Message, _lastReadMessage++); - pubSubMessages.Add(container); - _pending.Add(new PendingDelivery(container.SequenceToken, message)); - } - - return pubSubMessages; - } - finally - { - _outstandingTask = null; - } - } - - public async Task MessagesDeliveredAsync(IList messages) - { - try - { - var pubSubRef = _pubSub; // store direct ref, in case we are somehow asked to shutdown while we are receiving. - if (messages.Count == 0 || pubSubRef == null) return; - // get sequence tokens of delivered messages - List deliveredTokens = messages.Select(message => message.SequenceToken).ToList(); - // find oldest delivered message - StreamSequenceToken oldest = deliveredTokens.Max(); - // finalize all pending messages at or befor the oldest - List finalizedDeliveries = _pending - .Where(pendingDelivery => !pendingDelivery.Token.Newer(oldest)) - .ToList(); - if (finalizedDeliveries.Count == 0) return; - // remove all finalized deliveries from pending, regardless of if it was delivered or not. - _pending.RemoveRange(0, finalizedDeliveries.Count); - // get the queue messages for all finalized deliveries that were delivered. - List deliveredMessages = finalizedDeliveries - .Where(finalized => deliveredTokens.Contains(finalized.Token)) - .Select(finalized => finalized.Message) - .ToList(); - if (deliveredMessages.Count == 0) return; - // delete all delivered queue messages from the queue. Anything finalized but not delivered will show back up later - _outstandingTask = pubSubRef.AcknowledgeMessages(deliveredMessages); - - try - { - await _outstandingTask; - } - catch (Exception exc) - { - _logger.LogWarning( - (int)GoogleErrorCode.AcknowledgeMessage, - exc, - "Exception upon AcknowledgeMessages on queue {Id}. Ignoring.", - Id); - } - } - finally - { - _outstandingTask = null; - } - } - - private class PendingDelivery - { - public PendingDelivery(StreamSequenceToken token, ReceivedMessage message) - { - Token = token; - Message = message; - } - - public ReceivedMessage Message { get; } - - public StreamSequenceToken Token { get; } - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubBatchContainer.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubBatchContainer.cs deleted file mode 100644 index 882a2ee9f4..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubBatchContainer.cs +++ /dev/null @@ -1,76 +0,0 @@ -using Newtonsoft.Json; -using Orleans.Providers.Streams.Common; -using Orleans.Runtime; -using Orleans.Streams; -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Orleans.Providers.GCP.Streams.PubSub -{ - [Serializable] - [GenerateSerializer] - public class PubSubBatchContainer : IBatchContainer - { - [JsonProperty] - [Id(0)] - private EventSequenceTokenV2 sequenceToken; - - [JsonProperty] - [Id(1)] - private readonly List events; - - [JsonProperty] - [Id(2)] - private readonly Dictionary requestContext; - - [Id(3)] - public StreamId StreamId { get; } - - public StreamSequenceToken SequenceToken => sequenceToken; - - internal EventSequenceTokenV2 RealSequenceToken - { - set { sequenceToken = value; } - } - - [JsonConstructor] - public PubSubBatchContainer( - StreamId streamId, - List events, - Dictionary requestContext, - EventSequenceTokenV2 sequenceToken) - : this(streamId, events, requestContext) - { - this.sequenceToken = sequenceToken; - } - - public PubSubBatchContainer(StreamId streamId, List events, Dictionary requestContext) - { - StreamId = streamId; - if (events == null) throw new ArgumentNullException(nameof(events), "Message contains no events"); - this.events = events; - this.requestContext = requestContext; - } - - public IEnumerable> GetEvents() - { - return events.OfType().Select((e, i) => Tuple.Create(e, sequenceToken.CreateSequenceTokenForEvent(i))); - } - - public bool ImportRequestContext() - { - if (requestContext != null) - { - RequestContextExtensions.Import(requestContext); - return true; - } - return false; - } - - public override string ToString() - { - return $"[GooglePubSubBatchContainer:Stream={StreamId},#Items={events.Count}]"; - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubDataManager.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubDataManager.cs deleted file mode 100644 index eb07500468..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubDataManager.cs +++ /dev/null @@ -1,206 +0,0 @@ -using Google.Api.Gax.Grpc; -using Google.Cloud.PubSub.V1; -using Grpc.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -namespace Orleans.Providers.GCP.Streams.PubSub -{ - /// - /// Utility class to encapsulate access to Google PubSub APIs. - /// - /// Used by Google PubSub streaming provider. - public class PubSubDataManager - { - public const int MAX_PULLED_MESSAGES = 1000; - - public TopicName TopicName { get; private set; } - public SubscriptionName SubscriptionName { get; private set; } - - private Subscription _subscription; - private Topic _topic; - private PublisherClient _publisher; - private SubscriberClient _subscriber; - private readonly TimeSpan? _deadline; - private readonly ServiceEndpoint _customEndpoint; - - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] - private readonly ILogger _logger; - - public PubSubDataManager(ILoggerFactory loggerFactory, string projectId, string topicId, string subscriptionId, string serviceId, TimeSpan? deadline = null, string customEndpoint = "") - { - if (string.IsNullOrWhiteSpace(serviceId)) throw new ArgumentNullException(nameof(serviceId)); - if (string.IsNullOrWhiteSpace(projectId)) throw new ArgumentNullException(nameof(projectId)); - if (string.IsNullOrWhiteSpace(topicId)) throw new ArgumentNullException(nameof(topicId)); - if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentNullException(nameof(subscriptionId)); - - _logger = loggerFactory.CreateLogger(); - _deadline = deadline; - topicId = $"{topicId}-{serviceId}"; - subscriptionId = $"{projectId}-{serviceId}"; - TopicName = new TopicName(projectId, topicId); - SubscriptionName = new SubscriptionName(projectId, subscriptionId); - - if (!string.IsNullOrWhiteSpace(customEndpoint)) - { - var hostPort = customEndpoint.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries); - if (hostPort.Length != 2) throw new ArgumentException(nameof(customEndpoint)); - - var host = hostPort[0]; - int port; - if (!int.TryParse(hostPort[1], out port)) throw new ArgumentException(nameof(customEndpoint)); - - _customEndpoint = new ServiceEndpoint(host, port); - } - } - - public async Task Initialize() - { - try - { - _publisher = await PublisherClient.CreateAsync(_customEndpoint); - } - catch (Exception e) - { - ReportErrorAndRethrow(e, "CreateAsync", GoogleErrorCode.Initializing); - } - - bool didCreate = false; - - try - { - _topic = await _publisher.CreateTopicAsync(TopicName); - didCreate = true; - } - catch (RpcException e) - { - if (e.Status.StatusCode != StatusCode.AlreadyExists) - ReportErrorAndRethrow(e, "CreateTopicAsync", GoogleErrorCode.Initializing); - - _topic = await _publisher.GetTopicAsync(TopicName); - } - - _logger.LogInformation((int)GoogleErrorCode.Initializing, "{Verb} Google PubSub Topic {TopicId}", (didCreate ? "Created" : "Attached to"), TopicName.TopicId); - - didCreate = false; - - try - { - _subscriber = await SubscriberClient.CreateAsync(_customEndpoint); - _subscription = await _subscriber.CreateSubscriptionAsync(SubscriptionName, TopicName, pushConfig: null, - ackDeadlineSeconds: _deadline.HasValue ? (int)_deadline.Value.TotalSeconds : 60); - didCreate = true; - } - catch (RpcException e) - { - if (e.Status.StatusCode != StatusCode.AlreadyExists) - ReportErrorAndRethrow(e, "CreateSubscriptionAsync", GoogleErrorCode.Initializing); - - _subscription = await _subscriber.GetSubscriptionAsync(SubscriptionName); - } - - _logger.LogInformation( - (int)GoogleErrorCode.Initializing, - "{Verb} Google PubSub Subscription {SubscriptionId} to Topic {TopicId}", - (didCreate ? "Created" : "Attached to"), - SubscriptionName.SubscriptionId, - TopicName.TopicId); - } - - public async Task DeleteTopic() - { - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Deleting Google PubSub topic: {TopicId}", TopicName.TopicId); - try - { - await _publisher?.DeleteTopicAsync(TopicName); - _logger.LogInformation((int)GoogleErrorCode.Initializing, "Deleted Google PubSub topic {TopicId}", TopicName.TopicId); - } - catch (Exception exc) - { - ReportErrorAndRethrow(exc, "DeleteTopic", GoogleErrorCode.DeleteTopic); - } - } - - public async Task PublishMessages(IEnumerable messages) - { - var count = messages.Count(); - if (count < 1) return; - - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Publishing {Count} messages to topic {TopicId}", count, TopicName.TopicId); - - try - { - await _publisher?.PublishAsync(TopicName, messages); - } - catch (Exception exc) - { - ReportErrorAndRethrow(exc, "PublishMessage", GoogleErrorCode.PublishMessage); - } - } - - public async Task> GetMessages(int count = 1) - { - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Getting {Count} message(s) from Google PubSub topic {TopicId}", count, TopicName.TopicId); - - PullResponse response = null; - try - { - //According to Google, no more than 1000 messages can be published/received - response = await _subscriber.PullAsync(SubscriptionName, true, count < 1 ? MAX_PULLED_MESSAGES : count); - } - catch (Exception exc) - { - ReportErrorAndRethrow(exc, "GetMessages", GoogleErrorCode.GetMessages); - } - - if (_logger.IsEnabled(LogLevel.Trace)) - { - _logger.LogTrace("Received {Count} message(s) from Google PubSub topic {TopicId}", response.ReceivedMessages.Count, TopicName.TopicId); - - foreach (var received in response.ReceivedMessages) - { - _logger.LogTrace( - "Received message {MessageId} published {PublishedTime} from Google PubSub topic {TopicId}", - received.Message.MessageId, - received.Message.PublishTime.ToDateTime(), - TopicName.TopicId); - } - } - - return response.ReceivedMessages; - } - - public async Task AcknowledgeMessages(IEnumerable messages) - { - var count = messages.Count(); - if (count < 1) return; - - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Deleting {Count} message(s) from Google PubSub topic {TopicId}", count, TopicName.TopicId); - - try - { - await _subscriber.AcknowledgeAsync(SubscriptionName, messages.Select(m => m.AckId)); - } - catch (Exception exc) - { - ReportErrorAndRethrow(exc, "DeleteMessage", GoogleErrorCode.DeleteMessage); - } - } - - private void ReportErrorAndRethrow(Exception exc, string operation, GoogleErrorCode errorCode) - { - _logger.LogError( - (int)errorCode, - exc, - "Error doing {Operation} for Google Project {ProjectId} at PubSub Topic {TopicId} ", - operation, - TopicName.ProjectId, - TopicName.TopicId); - throw new AggregateException( - $"Error doing {operation} for Google Project {TopicName.ProjectId} at PubSub Topic {TopicName.TopicId} {Environment.NewLine}Exception = {exc}", - exc); - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamConfigurator.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamConfigurator.cs deleted file mode 100644 index 9fca87c08b..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamConfigurator.cs +++ /dev/null @@ -1,63 +0,0 @@ -using Orleans.Configuration; -using Orleans.Providers.GCP.Streams.PubSub; -using System; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; - -namespace Orleans.Hosting -{ - public class SiloPubSubStreamConfigurator : SiloPersistentStreamConfigurator - where TDataAdapter : IPubSubDataAdapter - { - public SiloPubSubStreamConfigurator(string name, Action> configureServicesDelegate) - : base(name, configureServicesDelegate, PubSubAdapterFactory.Create) - { - this.ConfigureDelegate(services => - { - services.ConfigureNamedOptionForLogging(name) - .ConfigureNamedOptionForLogging(name) - .ConfigureNamedOptionForLogging(name); - }); - } - - public SiloPubSubStreamConfigurator ConfigurePubSub(Action> configureOptions) - { - this.Configure(configureOptions); - return this; - } - - public SiloPubSubStreamConfigurator ConfigureCache(int cacheSize = SimpleQueueCacheOptions.DEFAULT_CACHE_SIZE) - { - this.Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); - return this; - } - - public SiloPubSubStreamConfigurator ConfigurePartitioning(int numOfPartitions = HashRingStreamQueueMapperOptions.DEFAULT_NUM_QUEUES) - { - this.Configure(ob => ob.Configure(options => options.TotalQueueCount = numOfPartitions)); - return this; - } - } - - public class ClusterClientPubSubStreamConfigurator : ClusterClientPersistentStreamConfigurator - where TDataAdapter : IPubSubDataAdapter - { - public ClusterClientPubSubStreamConfigurator(string name, IClientBuilder builder) - : base(name, builder, PubSubAdapterFactory.Create) - { - builder.ConfigureServices(services => services.ConfigureNamedOptionForLogging(name)); - } - - public ClusterClientPubSubStreamConfigurator ConfigurePubSub(Action> configureOptions) - { - this.Configure(configureOptions); - return this; - } - - public ClusterClientPubSubStreamConfigurator ConfigurePartitioning(int numOfPartitions = HashRingStreamQueueMapperOptions.DEFAULT_NUM_QUEUES) - { - this.Configure(ob => ob.Configure(options => options.TotalQueueCount = numOfPartitions)); - return this; - } - } -} diff --git a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamOptions.cs b/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamOptions.cs deleted file mode 100644 index 9ad4396e5e..0000000000 --- a/src/Orleans.Streaming.GCP/Providers/Streams/PubSub/PubSubStreamOptions.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; - -namespace Orleans.Configuration -{ - public class PubSubOptions - { - public string ProjectId { get; set; } - - public string TopicId { get; set; } - - public string CustomEndpoint { get; set; } - - private TimeSpan? deadline; - public TimeSpan? Deadline - { - get { return this.deadline; } - set { this.deadline = (value.HasValue) ? TimeSpan.FromTicks(Math.Min(value.Value.Ticks, MAX_DEADLINE.Ticks)) : value; } - } - public static readonly TimeSpan MAX_DEADLINE = TimeSpan.FromSeconds(600); - } -} diff --git a/test/Extensions/GoogleUtils.Tests/App.config b/test/Extensions/GoogleUtils.Tests/App.config deleted file mode 100644 index 97da3bff4b..0000000000 --- a/test/Extensions/GoogleUtils.Tests/App.config +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/test/Extensions/GoogleUtils.Tests/CollectionFixtures.cs b/test/Extensions/GoogleUtils.Tests/CollectionFixtures.cs deleted file mode 100644 index 7aa531ac9b..0000000000 --- a/test/Extensions/GoogleUtils.Tests/CollectionFixtures.cs +++ /dev/null @@ -1,12 +0,0 @@ -using TestExtensions; -using Xunit; - -namespace GoogleUtils.Tests.Streaming -{ - // Assembly collections must be defined once in each assembly - [CollectionDefinition("DefaultCluster")] - public class DefaultClusterTestCollection : ICollectionFixture { } - - [CollectionDefinition(TestEnvironmentFixture.DefaultCollection)] - public class TestEnvironmentFixtureCollection : ICollectionFixture { } -} diff --git a/test/Extensions/GoogleUtils.Tests/GoogleTestUtils.cs b/test/Extensions/GoogleUtils.Tests/GoogleTestUtils.cs deleted file mode 100644 index e12eefd1fa..0000000000 --- a/test/Extensions/GoogleUtils.Tests/GoogleTestUtils.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using System.Net.Http; - -namespace GoogleUtils.Tests -{ - public static class GoogleTestUtils - { - public static string DeploymentId => Guid.NewGuid().ToString(); - public static string ProjectId => "feisty-flow-173313"; - public static string TopicId => "GPSTestUtilsTopicId"; - - public static Lazy IsPubSubSimulatorAvailable = new Lazy(() => - { - try - { - var ok = new HttpClient().GetAsync("http://localhost:8085").Result; - return ok.IsSuccessStatusCode; - } - catch - { - return false; - } - }); - } -} diff --git a/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.csproj b/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.csproj deleted file mode 100644 index 58688cf3be..0000000000 --- a/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - $(TestTargetFrameworks) - true - - - - - - - - - - - diff --git a/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.xunit.runner.json b/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.xunit.runner.json deleted file mode 100644 index 0920af9c34..0000000000 --- a/test/Extensions/GoogleUtils.Tests/GoogleUtils.Tests.xunit.runner.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "appDomain": "ifAvailable", - "diagnosticMessages": true, - "parallelizeAssembly": false, - "parallelizeTestCollections": true, - "methodDisplay": "classAndMethod", - "shadowCopy": false -} diff --git a/test/Extensions/GoogleUtils.Tests/Properties/AssemblyInfo.cs b/test/Extensions/GoogleUtils.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index 2219b1e928..0000000000 --- a/test/Extensions/GoogleUtils.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,4 +0,0 @@ -using Xunit; - -// Disable XUnit concurrency limit -[assembly: CollectionBehavior(MaxParallelThreads = -1)] diff --git a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubClientStreamTests.cs b/test/Extensions/GoogleUtils.Tests/Streaming/PubSubClientStreamTests.cs deleted file mode 100644 index 9680b299af..0000000000 --- a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubClientStreamTests.cs +++ /dev/null @@ -1,92 +0,0 @@ -using System; -using System.Threading.Tasks; -using Orleans.Runtime; -using Orleans.TestingHost; -using Tester.StreamingTests; -using TestExtensions; -using Xunit; -using Xunit.Abstractions; -using Orleans.Providers.GCP.Streams.PubSub; -using Orleans.Hosting; -using Microsoft.Extensions.Configuration; -using Orleans; -using Orleans.Configuration; - -namespace GoogleUtils.Tests.Streaming -{ - [TestCategory("GCP"), TestCategory("PubSub")] - public class PubSubClientStreamTests : TestClusterPerTest - { - private const string PROVIDER_NAME = "PubSubProvider"; - private const string STREAM_NAMESPACE = "PubSubSubscriptionMultiplicityTestsNamespace"; - - private readonly ITestOutputHelper output; - private ClientStreamTestRunner runner; - - public PubSubClientStreamTests(ITestOutputHelper output) - { - this.output = output; - } - - public override async Task InitializeAsync() - { - await base.InitializeAsync(); - runner = new ClientStreamTestRunner(this.HostedCluster); - } - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - if (!GoogleTestUtils.IsPubSubSimulatorAvailable.Value) - { - throw new SkipException("Google PubSub Simulator not available"); - } - - builder.AddSiloBuilderConfigurator(); - builder.AddClientBuilderConfigurator(); - } - - private class MySiloBuilderConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder hostBuilder) - { - hostBuilder - .AddMemoryGrainStorage("PubSubStore") - .AddPubSubStreams(PROVIDER_NAME, options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }) - .Configure(options => options.ClientDropTimeout = TimeSpan.FromSeconds(5)); - } - } - - private class MyClientBuilderConfigurator : IClientBuilderConfigurator - { - public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) - { - clientBuilder - .AddPubSubStreams(PROVIDER_NAME, options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }); - } - } - - [SkippableFact] - public async Task GPS_StreamProducerOnDroppedClientTest() - { - logger.Info("************************ AQStreamProducerOnDroppedClientTest *********************************"); - await runner.StreamProducerOnDroppedClientTest(PROVIDER_NAME, STREAM_NAMESPACE); - } - - [SkippableFact(Skip = "PubSub has unpredictable event delivery counts - re-enable when we figure out how to handle this.")] - public async Task GPS_StreamConsumerOnDroppedClientTest() - { - logger.Info("************************ AQStreamConsumerOnDroppedClientTest *********************************"); - await runner.StreamConsumerOnDroppedClientTest(PROVIDER_NAME, STREAM_NAMESPACE, output); - } - } -} diff --git a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs b/test/Extensions/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs deleted file mode 100644 index 2c3e9edbab..0000000000 --- a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs +++ /dev/null @@ -1,186 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Configuration; -using Orleans; -using Orleans.Hosting; -using Orleans.Providers.GCP.Streams.PubSub; -using Orleans.TestingHost; -using TestExtensions; -using UnitTests.Streaming; -using UnitTests.StreamingTests; -using Xunit; - -namespace GoogleUtils.Tests.Streaming -{ - [TestCategory("GCP"), TestCategory("PubSub")] - public class PubSubStreamTests : TestClusterPerTest - { - public static readonly string PUBSUB_STREAM_PROVIDER_NAME = "GPSProvider"; - private SingleStreamTestRunner runner; - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - if (!GoogleTestUtils.IsPubSubSimulatorAvailable.Value) - { - throw new SkipException("Google PubSub Simulator not available"); - } - - builder.AddSiloBuilderConfigurator(); - builder.AddClientBuilderConfigurator(); - } - - private class MySiloBuilderConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder hostBuilder) - { - hostBuilder - .AddMemoryGrainStorage("MemoryStore", op => op.NumStorageGrains = 1) - .AddMemoryGrainStorage("PubSubStorage") - .AddSimpleMessageStreamProvider("SMSProvider") - .AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, b=> - b.ConfigurePubSub(ob=>ob.Configure(options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }))); - } - } - - private class MyClientBuilderConfigurator : IClientBuilderConfigurator - { - public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) - { - clientBuilder - .AddSimpleMessageStreamProvider("SMSProvider") - .AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, b=> - b.ConfigurePubSub(ob=>ob.Configure(options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }))); - } - } - - public override async Task InitializeAsync() - { - await base.InitializeAsync(); - runner = new SingleStreamTestRunner(InternalClient, PUBSUB_STREAM_PROVIDER_NAME); - } - - ////------------------------ One to One ----------------------// - - [SkippableFact] - public async Task GPS_01_OneProducerGrainOneConsumerGrain() - { - await runner.StreamTest_01_OneProducerGrainOneConsumerGrain(); - } - - [SkippableFact] - public async Task GPS_02_OneProducerGrainOneConsumerClient() - { - await runner.StreamTest_02_OneProducerGrainOneConsumerClient(); - } - - [SkippableFact] - public async Task GPS_03_OneProducerClientOneConsumerGrain() - { - await runner.StreamTest_03_OneProducerClientOneConsumerGrain(); - } - - [SkippableFact] - public async Task GPS_04_OneProducerClientOneConsumerClient() - { - await runner.StreamTest_04_OneProducerClientOneConsumerClient(); - } - - //------------------------ MANY to Many different grains ----------------------// - - [SkippableFact] - public async Task GPS_05_ManyDifferent_ManyProducerGrainsManyConsumerGrains() - { - await runner.StreamTest_05_ManyDifferent_ManyProducerGrainsManyConsumerGrains(); - } - - [SkippableFact] - public async Task GPS_06_ManyDifferent_ManyProducerGrainManyConsumerClients() - { - await runner.StreamTest_06_ManyDifferent_ManyProducerGrainManyConsumerClients(); - } - - [SkippableFact] - public async Task GPS_07_ManyDifferent_ManyProducerClientsManyConsumerGrains() - { - await runner.StreamTest_07_ManyDifferent_ManyProducerClientsManyConsumerGrains(); - } - - [SkippableFact] - public async Task GPS_08_ManyDifferent_ManyProducerClientsManyConsumerClients() - { - await runner.StreamTest_08_ManyDifferent_ManyProducerClientsManyConsumerClients(); - } - - //------------------------ MANY to Many Same grains ----------------------// - [SkippableFact] - public async Task GPS_09_ManySame_ManyProducerGrainsManyConsumerGrains() - { - await runner.StreamTest_09_ManySame_ManyProducerGrainsManyConsumerGrains(); - } - - [SkippableFact] - public async Task GPS_10_ManySame_ManyConsumerGrainsManyProducerGrains() - { - await runner.StreamTest_10_ManySame_ManyConsumerGrainsManyProducerGrains(); - } - - [SkippableFact] - public async Task GPS_11_ManySame_ManyProducerGrainsManyConsumerClients() - { - await runner.StreamTest_11_ManySame_ManyProducerGrainsManyConsumerClients(); - } - - [SkippableFact] - public async Task GPS_12_ManySame_ManyProducerClientsManyConsumerGrains() - { - await runner.StreamTest_12_ManySame_ManyProducerClientsManyConsumerGrains(); - } - - //------------------------ MANY to Many producer consumer same grain ----------------------// - - [SkippableFact] - public async Task GPS_13_SameGrain_ConsumerFirstProducerLater() - { - await runner.StreamTest_13_SameGrain_ConsumerFirstProducerLater(false); - } - - [SkippableFact] - public async Task GPS_14_SameGrain_ProducerFirstConsumerLater() - { - await runner.StreamTest_14_SameGrain_ProducerFirstConsumerLater(false); - } - - //----------------------------------------------// - - [SkippableFact] - public async Task GPS_15_ConsumeAtProducersRequest() - { - await runner.StreamTest_15_ConsumeAtProducersRequest(); - } - - [SkippableFact] - public async Task GPS_16_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains() - { - var multiRunner = new MultipleStreamsTestRunner(InternalClient, PUBSUB_STREAM_PROVIDER_NAME, 16, false); - await multiRunner.StreamTest_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains(); - } - - [SkippableFact] - public async Task GPS_17_MultipleStreams_1J_ManyProducerGrainsManyConsumerGrains() - { - var multiRunner = new MultipleStreamsTestRunner(InternalClient, PUBSUB_STREAM_PROVIDER_NAME, 17, false); - await multiRunner.StreamTest_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains( - this.HostedCluster.StartAdditionalSilo); - } - } -} diff --git a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubSubscriptionMultiplicityTests.cs b/test/Extensions/GoogleUtils.Tests/Streaming/PubSubSubscriptionMultiplicityTests.cs deleted file mode 100644 index 58a971bc08..0000000000 --- a/test/Extensions/GoogleUtils.Tests/Streaming/PubSubSubscriptionMultiplicityTests.cs +++ /dev/null @@ -1,125 +0,0 @@ -using Microsoft.Extensions.Configuration; -using Orleans; -using Orleans.Hosting; -using Orleans.Providers.GCP.Streams.PubSub; -using Orleans.Runtime; -using Orleans.TestingHost; -using System; -using System.Threading.Tasks; -using TestExtensions; -using UnitTests.StreamingTests; -using Xunit; - -namespace GoogleUtils.Tests.Streaming -{ - [TestCategory("GCP"), TestCategory("PubSub")] - public class PubSubSubscriptionMultiplicityTests : TestClusterPerTest - { - private const string PROVIDER_NAME = "PubSubProvider"; - private const string STREAM_NAMESPACE = "PubSubSubscriptionMultiplicityTestsNamespace"; - private SubscriptionMultiplicityTestRunner runner; - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - if (!GoogleTestUtils.IsPubSubSimulatorAvailable.Value) - { - throw new SkipException("Google PubSub Simulator not available"); - } - - builder.Options.ClusterId = GoogleTestUtils.ProjectId; - builder.AddSiloBuilderConfigurator(); - builder.AddClientBuilderConfigurator(); - } - - private class MySiloBuilderConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder hostBuilder) - { - hostBuilder - .AddMemoryGrainStorage("PubSubStore") - .AddPubSubStreams(PROVIDER_NAME, options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }); - } - } - - private class MyClientBuilderConfigurator : IClientBuilderConfigurator - { - public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) - { - clientBuilder - .AddPubSubStreams(PROVIDER_NAME, options => - { - options.ProjectId = GoogleTestUtils.ProjectId; - options.TopicId = GoogleTestUtils.TopicId; - options.Deadline = TimeSpan.FromSeconds(600); - }); - } - } - - public override async Task InitializeAsync() - { - await base.InitializeAsync(); - runner = new SubscriptionMultiplicityTestRunner(PROVIDER_NAME, HostedCluster); - } - - [SkippableFact] - public async Task GPS_MultipleParallelSubscriptionTest() - { - logger.Info("************************ GPS_MultipleParallelSubscriptionTest *********************************"); - await runner.MultipleParallelSubscriptionTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_MultipleLinearSubscriptionTest() - { - logger.Info("************************ GPS_MultipleLinearSubscriptionTest *********************************"); - await runner.MultipleLinearSubscriptionTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_MultipleSubscriptionTest_AddRemove() - { - logger.Info("************************ GPS_MultipleSubscriptionTest_AddRemove *********************************"); - await runner.MultipleSubscriptionTest_AddRemove(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_ResubscriptionTest() - { - logger.Info("************************ GPS_ResubscriptionTest *********************************"); - await runner.ResubscriptionTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_ResubscriptionAfterDeactivationTest() - { - logger.Info("************************ ResubscriptionAfterDeactivationTest *********************************"); - await runner.ResubscriptionAfterDeactivationTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_ActiveSubscriptionTest() - { - logger.Info("************************ GPS_ActiveSubscriptionTest *********************************"); - await runner.ActiveSubscriptionTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - - [SkippableFact] - public async Task GPS_TwoIntermitentStreamTest() - { - logger.Info("************************ GPS_TwoIntermitentStreamTest *********************************"); - await runner.TwoIntermitentStreamTest(Guid.NewGuid()); - } - - [SkippableFact] - public async Task GPS_SubscribeFromClientTest() - { - logger.Info("************************ GPS_SubscribeFromClientTest *********************************"); - await runner.SubscribeFromClientTest(Guid.NewGuid(), STREAM_NAMESPACE); - } - } -}