diff --git a/Blumchen.sln b/Blumchen.sln index a5020ed..f8510ef 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -43,6 +43,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +71,10 @@ Global {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -79,6 +85,7 @@ Global {F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7} {C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} {8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} diff --git a/README.md b/README.md index 48856b2..36fd616 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr ```shell docker-compose up ``` -2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. +2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. ## Testing (against default docker instance) diff --git a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj new file mode 100644 index 0000000..5c7b408 --- /dev/null +++ b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj @@ -0,0 +1,54 @@ + + + + 0.1.1 + net8.0 + true + true + true + false + true + true + true + true + 12.0 + Oskar Dudycz + + https://github.com/event-driven-io/Blumchen + MIT + https://github.com/event-driven-io/Blumchen.git + true + Blumchen + true + true + true + snupkg + Blumchen + true + enable + enable + + + + 1591 + + + + 1591 + + + + + all + none + all + + + + + + + + + + diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 4e6b52c..0138080 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -1,7 +1,7 @@ - 0.1.0 + 0.1.1 net8.0 true true @@ -25,20 +25,35 @@ snupkg Blumchen + + + 1591 + + + + + 1591 + + <_Parameter1>Tests + + <_Parameter1>Blumchen.DependencyInjection + - + all none all + + diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 4e03b17..f4ccec1 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -4,8 +4,6 @@ using Blumchen.Subscriptions.ReplicationMessageHandlers; using Npgsql; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - namespace Blumchen.Database; public static class Run diff --git a/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..c01d5ab --- /dev/null +++ b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs @@ -0,0 +1,23 @@ +using Blumchen.Subscriptions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +#pragma warning disable IL2091 + +namespace Blumchen.DependencyInjection; + +public static class ServiceCollectionExtensions +{ + + public static IServiceCollection AddBlumchen( + this IServiceCollection service, + Func workerOptions) + where T : class, IHandler => + service + .AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build()) + .AddHostedService(provider => + new Worker(workerOptions(provider, new WorkerOptionsBuilder()).Build(), + ServiceProviderServiceExtensions.GetRequiredService>>(provider))); + + +} diff --git a/src/Blumchen/DependencyInjection/Worker.cs b/src/Blumchen/DependencyInjection/Worker.cs new file mode 100644 index 0000000..4f137b7 --- /dev/null +++ b/src/Blumchen/DependencyInjection/Worker.cs @@ -0,0 +1,41 @@ +using System.Collections.Concurrent; +using Blumchen.Subscriptions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Blumchen.DependencyInjection; + +public class Worker( + WorkerOptions options, + ILogger> logger): BackgroundService where T : class, IHandler +{ + private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; + private static readonly ConcurrentDictionary> LoggingActions = new(StringComparer.OrdinalIgnoreCase); + private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) + { + static Action LoggerAction(LogLevel ll, bool enabled) => + (ll, enabled) switch + { + (LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), + (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), + (_, _) => (_, _, _) => { } + }; + LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await options.ResiliencePipeline.ExecuteAsync(async token => + { + await using var subscription = new Subscription(); + await using var cursor = subscription.Subscribe(options.SubscriptionOptions, ct: token) + .GetAsyncEnumerator(token); + Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) + Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); + + }, stoppingToken).ConfigureAwait(false); + Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); + } + +} diff --git a/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs new file mode 100644 index 0000000..07c79c5 --- /dev/null +++ b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs @@ -0,0 +1,37 @@ +using Blumchen.Subscriptions; +using Polly; + +namespace Blumchen.DependencyInjection; + +public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriptionOptions SubscriptionOptions); + +public interface IWorkerOptionsBuilder +{ + IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline); + IWorkerOptionsBuilder Subscription(Func? builder); + WorkerOptions Build(); +} + +internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder +{ + private ResiliencePipeline? _resiliencePipeline = default; + private Func? _builder; + + public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline) + { + _resiliencePipeline = resiliencePipeline; + return this; + }public IWorkerOptionsBuilder Subscription(Func? builder) + { + _builder = builder; + return this; + } + + public WorkerOptions Build() + { + ArgumentNullException.ThrowIfNull(_resiliencePipeline); + ArgumentNullException.ThrowIfNull(_builder); + return new(_resiliencePipeline, _builder(new SubscriptionOptionsBuilder()).Build()); + } +} + diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/MessageTableOptions.cs index a7fe0b1..ec91b9b 100644 --- a/src/Blumchen/MessageTableOptions.cs +++ b/src/Blumchen/MessageTableOptions.cs @@ -1,9 +1,9 @@ using Blumchen.Subscriptions; +using JetBrains.Annotations; using NpgsqlTypes; namespace Blumchen; -#pragma warning disable CS1591 public record TableDescriptorBuilder { private MessageTable TableDescriptor { get; set; } = new(); @@ -34,6 +34,9 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250) return this; } + [UsedImplicitly] + public TableDescriptorBuilder UseDefaults() => this; + public record MessageTable(string Name = MessageTable.DefaultName) { internal const string DefaultName = "outbox"; diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index 623c8b4..860d80b 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -3,12 +3,11 @@ using Npgsql; namespace Blumchen.Publications; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class MessageAppender { public static async Task AppendAsync(T @input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver + , PublisherOptions resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct @@ -19,10 +18,10 @@ public static async Task AppendAsync(T @input case null: throw new ArgumentNullException(nameof(@input)); case IEnumerable inputs: - await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; default: - await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; } } @@ -36,30 +35,32 @@ private static async Task AppendAsyncOfT(T input { var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T)); var data = JsonSerialization.ToJson(@input, jsonTypeInfo); - var command = new NpgsqlCommand( + + await using var command = new NpgsqlCommand( $"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')", connection, transaction ); + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } public static async Task AppendAsync(T input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options + , PublisherOptions options , string connectionString , CancellationToken ct) where T: class { var type = typeof(T); - var (typeName, jsonTypeInfo) = options.resolver.Resolve(type); + var (typeName, jsonTypeInfo) = options.JsonTypeResolver.Resolve(type); var data = JsonSerialization.ToJson(input, jsonTypeInfo); - var connection = new NpgsqlConnection(connectionString); - await using var connection1 = connection.ConfigureAwait(false); + await using var connection = new NpgsqlConnection(connectionString); await connection.OpenAsync(ct).ConfigureAwait(false); - var command = connection.CreateCommand(); + await using var command = connection.CreateCommand(); command.CommandText = - $"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + $"INSERT INTO {options.TableDescriptor.Name}({options.TableDescriptor.MessageType.Name}, {options.TableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } @@ -70,7 +71,7 @@ private static async Task AppendBatchAsyncOfT(T inputs , NpgsqlTransaction transaction , CancellationToken ct) where T : class, IEnumerable { - var batch = new NpgsqlBatch(connection, transaction); + await using var batch = new NpgsqlBatch(connection, transaction); foreach (var input in inputs) { var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType()); diff --git a/src/Blumchen/Publications/PublisherOptions.cs b/src/Blumchen/Publications/PublisherOptions.cs new file mode 100644 index 0000000..d25c048 --- /dev/null +++ b/src/Blumchen/Publications/PublisherOptions.cs @@ -0,0 +1,20 @@ +using Blumchen.Database; +using Blumchen.Serialization; +using Npgsql; + +namespace Blumchen.Publications; + +public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver); + +public static class PublisherOptionsExtensions +{ + public static async Task EnsureTable(this PublisherOptions publisherOptions, NpgsqlDataSource dataSource, CancellationToken ct) + { + await dataSource.EnsureTableExists(publisherOptions.TableDescriptor, ct); + return publisherOptions; + } + + public static Task EnsureTable(this PublisherOptions publisherOptions, + string connectionString, CancellationToken ct) + => EnsureTable(publisherOptions, new NpgsqlDataSourceBuilder(connectionString).Build(), ct); +} diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs index 90e7792..e42c4a2 100644 --- a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs +++ b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs @@ -5,7 +5,6 @@ namespace Blumchen.Publications; -#pragma warning disable CS1591 public class PublisherSetupOptionsBuilder { private INamingPolicy? _namingPolicy; @@ -34,7 +33,7 @@ public PublisherSetupOptionsBuilder WithTable(Func { @@ -25,8 +24,8 @@ internal sealed class JsonTypeResolver( internal void WhiteList(Type type) { var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); - _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); - _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); + _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type); + _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo); } public (string, JsonTypeInfo) Resolve(Type type) => diff --git a/src/Blumchen/Serialization/JsonSerialization.cs b/src/Blumchen/Serialization/JsonSerialization.cs index feb4095..6fdfad0 100644 --- a/src/Blumchen/Serialization/JsonSerialization.cs +++ b/src/Blumchen/Serialization/JsonSerialization.cs @@ -4,7 +4,6 @@ using Blumchen.Streams; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class JsonSerialization { diff --git a/src/Blumchen/Serialization/MessageUrnAttribute.cs b/src/Blumchen/Serialization/MessageUrnAttribute.cs index 8963e34..11ae685 100644 --- a/src/Blumchen/Serialization/MessageUrnAttribute.cs +++ b/src/Blumchen/Serialization/MessageUrnAttribute.cs @@ -1,7 +1,6 @@ using System.Collections.Concurrent; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member [AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)] public class MessageUrnAttribute: diff --git a/src/Blumchen/Subscriptions/IConsume.cs b/src/Blumchen/Subscriptions/IConsume.cs deleted file mode 100644 index 4a59c0f..0000000 --- a/src/Blumchen/Subscriptions/IConsume.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - -public interface IConsume; - -public interface IConsumes: IConsume where T : class -{ - Task Handle(T value); -} diff --git a/src/Blumchen/Subscriptions/IErrorProcessor.cs b/src/Blumchen/Subscriptions/IErrorProcessor.cs new file mode 100644 index 0000000..b3cec9d --- /dev/null +++ b/src/Blumchen/Subscriptions/IErrorProcessor.cs @@ -0,0 +1,11 @@ +namespace Blumchen.Subscriptions; + +public interface IErrorProcessor +{ + Func Process { get; } +} + +public record ConsoleOutErrorProcessor: IErrorProcessor +{ + public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); +} diff --git a/src/Blumchen/Subscriptions/IHandler.cs b/src/Blumchen/Subscriptions/IHandler.cs new file mode 100644 index 0000000..b722f25 --- /dev/null +++ b/src/Blumchen/Subscriptions/IHandler.cs @@ -0,0 +1,8 @@ +namespace Blumchen.Subscriptions; + +public interface IHandler; + +public interface IHandler: IHandler where T : class +{ + Task Handle(T value); +} diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 1be177c..27dd513 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -1,31 +1,35 @@ using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using static Blumchen.Subscriptions.Management.PublicationManagement; using static Blumchen.Subscriptions.Management.ReplicationSlotManagement; namespace Blumchen.Subscriptions; -internal interface ISubscriptionOptions +public interface ISubscriptionOptions { - [UsedImplicitly] string ConnectionString { get; } + [UsedImplicitly] NpgsqlDataSource DataSource { get; } + [UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; } IReplicationDataMapper DataMapper { get; } [UsedImplicitly] PublicationSetupOptions PublicationOptions { get; } [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } void Deconstruct( - out string connectionString, + out NpgsqlDataSource dataSource, + out NpgsqlConnectionStringBuilder connectionStringBuilder, out PublicationSetupOptions publicationSetupOptions, out ReplicationSlotSetupOptions replicationSlotSetupOptions, out IErrorProcessor errorProcessor, out IReplicationDataMapper dataMapper, - out Dictionary registry); + out Dictionary registry); } internal record SubscriptionOptions( - string ConnectionString, + NpgsqlDataSource DataSource, + NpgsqlConnectionStringBuilder ConnectionStringBuilder, PublicationSetupOptions PublicationOptions, ReplicationSlotSetupOptions ReplicationOptions, IErrorProcessor ErrorProcessor, IReplicationDataMapper DataMapper, - Dictionary Registry): ISubscriptionOptions; + Dictionary Registry): ISubscriptionOptions; diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 64e6033..c2b6f6c 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -3,7 +3,6 @@ using Npgsql; #pragma warning disable CA2208 -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace Blumchen.Subscriptions.Management; @@ -70,14 +69,11 @@ internal static Task CreatePublication( ISet eventTypes, CancellationToken ct ) { + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} {{0}} WITH (publish = 'insert');"; return eventTypes.Count switch { - 0 => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", - ct - ), - _ => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WHERE ({PublicationFilter(eventTypes)}) WITH (publish = 'insert');", - ct - ) + 0 => Execute(dataSource, string.Format(sql,string.Empty), ct), + _ => Execute(dataSource, string.Format(sql, $"WHERE ({PublicationFilter(eventTypes)})"), ct) }; static string PublicationFilter(ICollection input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'")); } @@ -129,8 +125,7 @@ private static Task PublicationExists( this NpgsqlDataSource dataSource, string publicationName, CancellationToken ct - ) => - dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); + ) => dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); public abstract record SetupPublicationResult { diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 4595dd4..adeb6a8 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -5,11 +5,16 @@ namespace Blumchen.Subscriptions.Management; using static ReplicationSlotManagement.CreateReplicationSlotResult; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class ReplicationSlotManagement { #pragma warning disable CA2208 + private static Task ReplicationSlotExists( + this NpgsqlDataSource dataSource, + string slotName, + CancellationToken ct + ) => dataSource.Exists("pg_replication_slots", "slot_name ILIKE $1", [slotName], ct); + public static async Task SetupReplicationSlot( this NpgsqlDataSource dataSource, LogicalReplicationConnection connection, @@ -54,18 +59,12 @@ static async Task Create( } } - private static Task ReplicationSlotExists( - this NpgsqlDataSource dataSource, - string slotName, - CancellationToken ct - ) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct); - public record ReplicationSlotSetupOptions( string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, - bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY + bool Binary = + false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY ); - public abstract record CreateReplicationSlotResult { public record None: CreateReplicationSlotResult; diff --git a/src/Blumchen/Subscriptions/MimeType.cs b/src/Blumchen/Subscriptions/MimeType.cs index 8bb3ef9..1197908 100644 --- a/src/Blumchen/Subscriptions/MimeType.cs +++ b/src/Blumchen/Subscriptions/MimeType.cs @@ -1,6 +1,5 @@ namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 public abstract record MimeType(string mimeType) { public record Json(): MimeType("application/json"); diff --git a/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs b/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs new file mode 100644 index 0000000..3263482 --- /dev/null +++ b/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs @@ -0,0 +1,11 @@ +namespace Blumchen.Subscriptions; + +internal class ObjectTracingConsumer: IHandler +{ + private static ulong _counter = 0; + public Task Handle(object value) + { + Interlocked.Increment(ref _counter); + return Console.Out.WriteLineAsync(); + } +} diff --git a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs index 90b75d2..1f74c3a 100644 --- a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs +++ b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs @@ -3,7 +3,6 @@ using Npgsql.Replication.PgOutput.Messages; namespace Blumchen.Subscriptions.Replication; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IReplicationDataMapper { diff --git a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs b/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs index 099a627..aa2e812 100644 --- a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs +++ b/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs @@ -1,5 +1,4 @@ namespace Blumchen.Subscriptions.ReplicationMessageHandlers; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IEnvelope; diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs index 5603e79..eaaffe4 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -5,7 +5,6 @@ using Npgsql; namespace Blumchen.Subscriptions.SnapshotReader; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class SnapshotReader { diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 95854af..872df0b 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -5,14 +5,12 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.ReplicationMessageHandlers; using Blumchen.Subscriptions.SnapshotReader; -using Microsoft.Extensions.Logging; using Npgsql; using Npgsql.Replication; using Npgsql.Replication.PgOutput; using Npgsql.Replication.PgOutput.Messages; namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member using static PublicationManagement; using static ReplicationSlotManagement; @@ -26,27 +24,31 @@ public enum CreateStyle AlwaysRecreate, Never } - private static LogicalReplicationConnection? _connection; - private static readonly SubscriptionOptionsBuilder Builder = new(); + private LogicalReplicationConnection? _connection; + private readonly SubscriptionOptionsBuilder _builder = new(); private ISubscriptionOptions? _options; public async IAsyncEnumerable Subscribe( Func builder, - ILoggerFactory? loggerFactory = null, [EnumeratorCancellation] CancellationToken ct = default ) { - _options = builder(Builder).Build(); - var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; - var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); - dataSourceBuilder.UseLoggerFactory(loggerFactory); + await foreach (var _ in Subscribe(builder(_builder).Build(), ct)) + yield return _; + } + + internal async IAsyncEnumerable Subscribe( + ISubscriptionOptions subscriptionOptions, + [EnumeratorCancellation] CancellationToken ct = default + ) + { + _options = subscriptionOptions; + var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = subscriptionOptions; - var dataSource = dataSourceBuilder.Build(); await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); - _connection = new LogicalReplicationConnection(connectionString); + _connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString); await _connection.Open(ct).ConfigureAwait(false); - await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false); @@ -66,8 +68,8 @@ public async IAsyncEnumerable Subscribe( ); await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false)) - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) - yield return subscribe; + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) + yield return subscribe; } await foreach (var message in @@ -89,7 +91,7 @@ public async IAsyncEnumerable Subscribe( private static async IAsyncEnumerable ProcessEnvelope( IEnvelope envelope, - Dictionary registry, + Dictionary registry, IErrorProcessor errorProcessor ) where T:class { @@ -110,14 +112,14 @@ IErrorProcessor errorProcessor } } - private static readonly Dictionary Cache = []; + private static readonly Dictionary Cache = []; - private static (IConsume consumer, MethodInfo methodInfo) Memoize + private static (IHandler consumer, MethodInfo methodInfo) Memoize ( - Dictionary registry, + Dictionary registry, Type objType, - Func, Type, (IConsume consumer, MethodInfo methodInfo)> func + Func, Type, (IHandler consumer, MethodInfo methodInfo)> func ) { if (!Cache.TryGetValue(objType, out var entry)) @@ -125,7 +127,7 @@ private static (IConsume consumer, MethodInfo methodInfo) Memoize Cache[objType] = entry; return entry; } - private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) + private static (IHandler consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) { var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public); diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 13a97d8..8166ab2 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -2,45 +2,44 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using System.Text.Json.Serialization; namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + public sealed class SubscriptionOptionsBuilder { - private static string? _connectionString; - private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; - private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; - private static IReplicationDataMapper? _dataMapper; - private readonly Dictionary _registry = []; + private NpgsqlConnectionStringBuilder? _connectionStringBuilder; + private NpgsqlDataSource? _dataSource; + private PublicationManagement.PublicationSetupOptions _publicationSetupOptions = new(); + private ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; + private IReplicationDataMapper? _dataMapper; + private readonly Dictionary _registry = []; private IErrorProcessor? _errorProcessor; private INamingPolicy? _namingPolicy; private JsonSerializerContext? _jsonSerializerContext; - private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); + private readonly TableDescriptorBuilder _tableDescriptorBuilder = new(); private TableDescriptorBuilder.MessageTable? _messageTable; - - - static SubscriptionOptionsBuilder() - { - _connectionString = null; - _publicationSetupOptions = new(); - _replicationSlotSetupOptions = default; - _dataMapper = default; - } - [UsedImplicitly] public SubscriptionOptionsBuilder WithTable( Func builder) { - _messageTable = builder(TableDescriptorBuilder).Build(); + _messageTable = builder(_tableDescriptorBuilder).Build(); return this; } [UsedImplicitly] public SubscriptionOptionsBuilder ConnectionString(string connectionString) { - _connectionString = connectionString; + _connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString); + return this; + } + + [UsedImplicitly] + public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource) + { + _dataSource = dataSource; return this; } @@ -74,10 +73,9 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme } [UsedImplicitly] - public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class - where TU : class, IConsumes + public SubscriptionOptionsBuilder Consumes(IHandler handler) where T : class { - _registry.TryAdd(typeof(T), consumer); + _registry.TryAdd(typeof(T), handler); return this; } @@ -90,9 +88,10 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce internal ISubscriptionOptions Build() { - ArgumentNullException.ThrowIfNull(_connectionString); + _messageTable ??= _tableDescriptorBuilder.Build(); + ArgumentNullException.ThrowIfNull(_connectionStringBuilder); + ArgumentNullException.ThrowIfNull(_dataSource); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); - ArgumentNullException.ThrowIfNull(_messageTable); var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); foreach (var type in _registry.Keys) typeResolver.WhiteList(type); @@ -104,37 +103,18 @@ internal ISubscriptionOptions Build() if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); return new SubscriptionOptions( - _connectionString, + _dataSource, + _connectionStringBuilder, _publicationSetupOptions, _replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), _errorProcessor ?? new ConsoleOutErrorProcessor(), _dataMapper, _registry); + static void Ensure(Func> evalFn, string formattedMsg) { var misses = evalFn().ToArray(); if (misses.Length > 0) throw new Exception(string.Format(formattedMsg, string.Join(", ", misses.Select(t => $"'{t.Name}'")))); } - - } -} - -public class ObjectTracingConsumer: IConsumes -{ - private static ulong _counter = 0; - public Task Handle(object value) - { - Interlocked.Increment(ref _counter); - return Console.Out.WriteLineAsync(); } } - -public interface IErrorProcessor -{ - Func Process { get; } -} - -public record ConsoleOutErrorProcessor: IErrorProcessor -{ - public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); -} diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs deleted file mode 100644 index 93195a2..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs +++ /dev/null @@ -1,96 +0,0 @@ -using Commons.Events; -using PostgresOutbox.Events; -using PostgresOutbox.Serialization; -using PostgresOutbox.Subscriptions; -using PostgresOutbox.Subscriptions.Replication; -using Xunit.Abstractions; -using static PostgresOutbox.Subscriptions.Management.PublicationManagement; -using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; - -namespace PostgresOutboxPatternWithCDC.NET.Tests; - -public class LogicalReplicationTest -{ - private readonly ITestOutputHelper testOutputHelper; - - public LogicalReplicationTest(ITestOutputHelper testOutputHelper) - { - this.testOutputHelper = testOutputHelper; - } - - [Fact] - public async Task WALSubscriptionForNewEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - await foreach (var readEvent in events.WithCancellation(ct)) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - [Fact] - public async Task WALSubscriptionForOldEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - await foreach (var readEvent in events) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - private async Task CreateEventsTable( - string connectionString, - CancellationToken ct - ) - { - var tableName = Randomise("events"); - - await EventsTable.Create(connectionString, tableName, ct); - - return tableName; - } - - private static string Randomise(string prefix) => - $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; - - private const string ConnectrionString = - "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'"; -} diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj b/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj deleted file mode 100644 index 9e0d56c..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj +++ /dev/null @@ -1,25 +0,0 @@ - - - - net8.0 - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs deleted file mode 100644 index 8c927eb..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs +++ /dev/null @@ -1 +0,0 @@ -global using Xunit; \ No newline at end of file diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs index e9995e9..8a10b46 100644 --- a/src/Publisher/Contracts.cs +++ b/src/Publisher/Contracts.cs @@ -16,15 +16,21 @@ internal record UserDeleted( string Name = "Deleted" ): IContract; -[MessageUrn("user-modified:v1")] //subscription ignored +[MessageUrn("user-modified:v1")] internal record UserModified( Guid Id, string Name = "Modified" ): IContract; +[MessageUrn("user-subscribed:v1")] +internal record UserSubscribed( + Guid Id, + string Name = "Subscribed" +): IContract; [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreated))] [JsonSerializable(typeof(UserDeleted))] [JsonSerializable(typeof(UserModified))] +[JsonSerializable(typeof(UserSubscribed))] internal partial class SourceGenerationContext: JsonSerializerContext; diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index dae2987..cdbccc9 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -1,3 +1,4 @@ +using Blumchen.Database; using Blumchen.Publications; using Blumchen.Serialization; using Commons; @@ -6,14 +7,11 @@ using UserCreated = Publisher.UserCreated; using UserDeleted = Publisher.UserDeleted; using UserModified = Publisher.UserModified; +using UserSubscribed = Publisher.UserSubscribed; Console.Title = typeof(Program).Assembly.GetName().Name!; Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); - -var resolver = new PublisherSetupOptionsBuilder() - .JsonContext(SourceGenerationContext.Default) - .NamingPolicy(new AttributeNamingPolicy()) - .Build(); +var cts = new CancellationTokenSource(); do { @@ -21,8 +19,20 @@ var line = Console.ReadLine(); if (line != null && int.TryParse(line, out var result)) { - var cts = new CancellationTokenSource(); + var resolver = await new PublisherSetupOptionsBuilder() + .JsonContext(SourceGenerationContext.Default) + .NamingPolicy(new AttributeNamingPolicy()) + .WithTable(builder => builder.UseDefaults()) //default, but explicit + .Build() + .EnsureTable(Settings.ConnectionString, cts.Token)//enforce table existence and conformity - db roundtrip + .ConfigureAwait(false); + + //Or you might want to verify at a later stage + await new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .Build() + .EnsureTableExists(resolver.TableDescriptor, cts.Token).ConfigureAwait(false); + var messages = result / 4; var ct = cts.Token; var connection = new NpgsqlConnection(Settings.ConnectionString); await using var connection1 = connection.ConfigureAwait(false); @@ -30,12 +40,17 @@ //use a command for each message { var @events = Enumerable.Range(0, result).Select(i => - (i % 3) switch + (i % 4) switch { 0 => new UserCreated(Guid.NewGuid()) as object, 1 => new UserDeleted(Guid.NewGuid()), - _ => new UserModified(Guid.NewGuid()) + 2 => new UserModified(Guid.NewGuid()), + _ => new UserSubscribed(Guid.NewGuid()) }); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 0) ? 1 : 0)} {nameof(UserCreated)}"); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 1) ? 1 : 0)} {nameof(UserDeleted)}"); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 2) ? 1 : 0)} {nameof(UserModified)}"); + await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserSubscribed)}"); foreach (var @event in @events) { var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); @@ -52,6 +67,9 @@ case UserModified m: await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); break; + case UserSubscribed m: + await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); + break; } await transaction.CommitAsync(ct).ConfigureAwait(false); @@ -63,6 +81,7 @@ throw; } } + await Console.Out.WriteLineAsync($"Published {result} messages!"); } //use a batch command //{ diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index c023ac0..1d129f8 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -2,6 +2,7 @@ using Blumchen.Subscriptions; using Commons; using Microsoft.Extensions.Logging; +using Npgsql; using Subscriber; #pragma warning disable CS8601 // Possible null reference assignment. @@ -20,8 +21,11 @@ try { + var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole())); var cursor = subscription.Subscribe( builder => builder + .DataSource(dataSourceBuilder.Build()) .ConnectionString(Settings.ConnectionString) .WithTable(options => options .Id("id") @@ -30,8 +34,8 @@ ) .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) - .Consumes(consumer) - .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + .Consumes(consumer) + .Consumes(consumer), ct:ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); @@ -46,8 +50,8 @@ namespace Subscriber { internal class Consumer: - IConsumes, - IConsumes + IHandler, + IHandler { public Task Handle(UserCreatedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserCreatedContract)); public Task Handle(UserDeletedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserDeletedContract)); diff --git a/src/Subscriber/Subscriber.csproj b/src/Subscriber/Subscriber.csproj index d5d5273..7878764 100644 --- a/src/Subscriber/Subscriber.csproj +++ b/src/Subscriber/Subscriber.csproj @@ -5,7 +5,8 @@ net8.0 enable enable - true + True + true false diff --git a/src/SubscriberWorker/Contracts.cs b/src/SubscriberWorker/Contracts.cs new file mode 100644 index 0000000..a3c501a --- /dev/null +++ b/src/SubscriberWorker/Contracts.cs @@ -0,0 +1,29 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; + +namespace SubscriberWorker +{ + [MessageUrn("user-created:v1")] + public record UserCreatedContract( + Guid Id, + string Name + ); + + [MessageUrn("user-deleted:v1")] + public record UserDeletedContract( + Guid Id, + string Name + ); + + [MessageUrn("user-modified:v1")] //subscription ignored + public record UserModifiedContract( + Guid Id, + string Name = "Modified" + ); + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(UserCreatedContract))] + [JsonSerializable(typeof(UserDeletedContract))] + [JsonSerializable(typeof(UserModifiedContract))] + internal partial class SourceGenerationContext: JsonSerializerContext; +} diff --git a/src/SubscriberWorker/Handler.cs b/src/SubscriberWorker/Handler.cs new file mode 100644 index 0000000..3ecf3fb --- /dev/null +++ b/src/SubscriberWorker/Handler.cs @@ -0,0 +1,37 @@ +using Blumchen.Subscriptions; +using Microsoft.Extensions.Logging; +#pragma warning disable CS9113 // Parameter is unread. + +namespace SubscriberWorker; + +public class HandlerBase(ILogger logger) +{ + private Task ReportSuccess(int count) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); + return Task.CompletedTask; + } + + private int _counter; + private int _completed; + protected Task Handle(T value) => + Interlocked.Increment(ref _counter) % 10 == 0 + //Simulating some exception on out of process dependencies + ? Task.FromException(new Exception($"Error on publishing {typeof(T).FullName}")) + : ReportSuccess(Interlocked.Increment(ref _completed)); +} + +public class HandleImpl2(ILogger logger): HandlerBase(logger), + IHandler +{ + public Task Handle(UserDeletedContract value) => Handle(value); +} + +public class HandleImpl1(ILogger logger) : HandlerBase(logger), + IHandler, IHandler +{ + public Task Handle(UserCreatedContract value) => Handle(value); + + public Task Handle(UserModifiedContract value) => Handle(value); +} diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs new file mode 100644 index 0000000..2c13d81 --- /dev/null +++ b/src/SubscriberWorker/Program.cs @@ -0,0 +1,98 @@ +using System.Text.Json.Serialization; +using Blumchen.DependencyInjection; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Commons; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Polly.Retry; +using Polly; +using SubscriberWorker; +using Npgsql; +using Blumchen.Subscriptions.Management; +using Polly.Registry; + + +#pragma warning disable CS8601 // Possible null reference assignment. +Console.Title = typeof(Program).Assembly.GetName().Name; +#pragma warning restore CS8601 // Possible null reference assignment. + + + +AppDomain.CurrentDomain.UnhandledException += (_, e) => Console.Out.WriteLine(e.ExceptionObject.ToString()); +TaskScheduler.UnobservedTaskException += (_, e) => Console.Out.WriteLine(e.Exception.ToString()); + +var cancellationTokenSource = new CancellationTokenSource(); +var builder = Host.CreateApplicationBuilder(args); + +builder.Services + + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl2>() + + .AddSingleton() + + .AddSingleton() + .AddSingleton() + + .AddResiliencePipeline("default", (pipelineBuilder, _) => + pipelineBuilder + .AddRetry(new RetryStrategyOptions + { + BackoffType = DelayBackoffType.Constant, + Delay = TimeSpan.FromSeconds(5), + MaxRetryAttempts = int.MaxValue + }).Build()) + + .AddLogging(loggingBuilder => + { + loggingBuilder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddFilter("Npgsql", LogLevel.Information) + .AddFilter("Blumchen", LogLevel.Debug) + .AddFilter("SubscriberWorker", LogLevel.Debug) + .AddSimpleConsole(); + }) + .AddTransient(sp => + new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(sp.GetRequiredService()).Build()) + + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions + .ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{nameof(HandleImpl1)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationSetupOptions($"{nameof(HandleImpl1)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + .NamingPolicy(provider.GetRequiredService()) + .JsonContext(SourceGenerationContext.Default) + .Consumes(provider.GetRequiredService>()) + .Consumes(provider.GetRequiredService>())) + .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) + + ) + + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions.ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{nameof(HandleImpl2)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationSetupOptions($"{nameof(HandleImpl2)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + .NamingPolicy(provider.GetRequiredService()) + .JsonContext(SourceGenerationContext.Default) + .Consumes(provider.GetRequiredService>())) + .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) + ) + ; + +await builder + .Build() + .RunAsync(cancellationTokenSource.Token) + .ConfigureAwait(false); diff --git a/src/SubscriberWorker/SubscriberWorker.csproj b/src/SubscriberWorker/SubscriberWorker.csproj new file mode 100644 index 0000000..ad5c3f6 --- /dev/null +++ b/src/SubscriberWorker/SubscriberWorker.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0 + enable + enable + true + true + false + + + + + + + + + + + + diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index fc71e14..1b2f1af 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -17,7 +17,7 @@ public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime { protected ITestOutputHelper Output { get; } = output; protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2)); - protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class + protected class TestHandler(Action log, JsonTypeInfo info): IHandler where T : class { public async Task Handle(T value) { @@ -67,7 +67,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri await command.ExecuteNonQueryAsync(ct); } - protected (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + protected (TestHandler handler, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( string connectionString, string eventsTable, JsonSerializerContext info, @@ -78,13 +78,14 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri { var jsonTypeInfo = info.GetTypeInfo(typeof(T)); ArgumentNullException.ThrowIfNull(jsonTypeInfo); - var consumer = new TestConsumer(log, jsonTypeInfo); + var consumer = new TestHandler(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() .WithErrorProcessor(new TestOutErrorProcessor(Output)) + .DataSource(new NpgsqlDataSourceBuilder(connectionString).Build()) .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) - .Consumes>(consumer) + .Consumes(consumer) .WithTable(o => o.Name(eventsTable)) .WithPublicationOptions( new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 4575b69..71b78c9 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -46,7 +46,7 @@ await MessageAppender.AppendAsync( var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index fd378d2..3a52e23 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -1,4 +1,3 @@ -using Blumchen; using Blumchen.Publications; using Blumchen.Serialization; using Blumchen.Subscriptions; @@ -39,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index 6422396..d1029fb 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return;