From ab10cc18f7d31792033f3f2c9b0dad9255910614 Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 08:53:50 +0200 Subject: [PATCH 01/28] FS clean up --- .../LogicalReplicationTest.cs | 96 ------------------- ...tgresOutboxPatternWithCDC.NET.Tests.csproj | 25 ----- .../Usings.cs | 1 - 3 files changed, 122 deletions(-) delete mode 100644 src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs delete mode 100644 src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj delete mode 100644 src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs deleted file mode 100644 index 93195a2..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs +++ /dev/null @@ -1,96 +0,0 @@ -using Commons.Events; -using PostgresOutbox.Events; -using PostgresOutbox.Serialization; -using PostgresOutbox.Subscriptions; -using PostgresOutbox.Subscriptions.Replication; -using Xunit.Abstractions; -using static PostgresOutbox.Subscriptions.Management.PublicationManagement; -using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; - -namespace PostgresOutboxPatternWithCDC.NET.Tests; - -public class LogicalReplicationTest -{ - private readonly ITestOutputHelper testOutputHelper; - - public LogicalReplicationTest(ITestOutputHelper testOutputHelper) - { - this.testOutputHelper = testOutputHelper; - } - - [Fact] - public async Task WALSubscriptionForNewEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - await foreach (var readEvent in events.WithCancellation(ct)) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - [Fact] - public async Task WALSubscriptionForOldEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - await foreach (var readEvent in events) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - private async Task CreateEventsTable( - string connectionString, - CancellationToken ct - ) - { - var tableName = Randomise("events"); - - await EventsTable.Create(connectionString, tableName, ct); - - return tableName; - } - - private static string Randomise(string prefix) => - $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; - - private const string ConnectrionString = - "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'"; -} diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj b/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj deleted file mode 100644 index 9e0d56c..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj +++ /dev/null @@ -1,25 +0,0 @@ - - - - net8.0 - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs deleted file mode 100644 index 8c927eb..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs +++ /dev/null @@ -1 +0,0 @@ -global using Xunit; \ No newline at end of file From 977f2034cb7c1f8f9e42515a41c70ef647903cdb Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 08:53:11 +0200 Subject: [PATCH 02/28] suppress CS1591 (Missing XML comment for publicly visible type or member) at project level --- src/Blumchen/Blumchen.csproj | 10 ++++++++++ src/Blumchen/Database/Run.cs | 2 -- src/Blumchen/MessageTableOptions.cs | 1 - src/Blumchen/Publications/MessageAppender.cs | 1 - .../Publications/PublisherSetupOptionsBuilder.cs | 1 - src/Blumchen/Serialization/IDictionaryExtensions.cs | 1 - src/Blumchen/Serialization/INamingPolicy.cs | 1 - src/Blumchen/Serialization/ITypeResolver.cs | 1 - src/Blumchen/Serialization/JsonSerialization.cs | 1 - src/Blumchen/Serialization/MessageUrnAttribute.cs | 1 - src/Blumchen/Subscriptions/IConsume.cs | 1 - .../Subscriptions/Management/PublicationManagement.cs | 1 - .../Management/ReplicationSlotManagement.cs | 1 - src/Blumchen/Subscriptions/MimeType.cs | 1 - .../Replication/IReplicationDataMapper.cs | 1 - .../ReplicationMessageHandlers/Envelope.cs | 1 - .../Subscriptions/SnapshotReader/SnapshotReader.cs | 1 - src/Blumchen/Subscriptions/Subscription.cs | 1 - .../Subscriptions/SubscriptionOptionsBuilder.cs | 2 +- 19 files changed, 11 insertions(+), 19 deletions(-) diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 4e6b52c..0f26919 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -25,6 +25,16 @@ snupkg Blumchen + + + 1591 + + + + + 1591 + + <_Parameter1>Tests diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 4e03b17..f4ccec1 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -4,8 +4,6 @@ using Blumchen.Subscriptions.ReplicationMessageHandlers; using Npgsql; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - namespace Blumchen.Database; public static class Run diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/MessageTableOptions.cs index a7fe0b1..7216108 100644 --- a/src/Blumchen/MessageTableOptions.cs +++ b/src/Blumchen/MessageTableOptions.cs @@ -3,7 +3,6 @@ namespace Blumchen; -#pragma warning disable CS1591 public record TableDescriptorBuilder { private MessageTable TableDescriptor { get; set; } = new(); diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index 623c8b4..c927dfe 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -3,7 +3,6 @@ using Npgsql; namespace Blumchen.Publications; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class MessageAppender { diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs index 90e7792..dbd7095 100644 --- a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs +++ b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs @@ -5,7 +5,6 @@ namespace Blumchen.Publications; -#pragma warning disable CS1591 public class PublisherSetupOptionsBuilder { private INamingPolicy? _namingPolicy; diff --git a/src/Blumchen/Serialization/IDictionaryExtensions.cs b/src/Blumchen/Serialization/IDictionaryExtensions.cs index f97da92..0017b82 100644 --- a/src/Blumchen/Serialization/IDictionaryExtensions.cs +++ b/src/Blumchen/Serialization/IDictionaryExtensions.cs @@ -1,4 +1,3 @@ -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace Blumchen.Serialization; diff --git a/src/Blumchen/Serialization/INamingPolicy.cs b/src/Blumchen/Serialization/INamingPolicy.cs index 68c96ef..be1be55 100644 --- a/src/Blumchen/Serialization/INamingPolicy.cs +++ b/src/Blumchen/Serialization/INamingPolicy.cs @@ -1,5 +1,4 @@ namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface INamingPolicy { diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 049462b..71e49d3 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -3,7 +3,6 @@ using System.Text.Json.Serialization.Metadata; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface ITypeResolver { diff --git a/src/Blumchen/Serialization/JsonSerialization.cs b/src/Blumchen/Serialization/JsonSerialization.cs index feb4095..6fdfad0 100644 --- a/src/Blumchen/Serialization/JsonSerialization.cs +++ b/src/Blumchen/Serialization/JsonSerialization.cs @@ -4,7 +4,6 @@ using Blumchen.Streams; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class JsonSerialization { diff --git a/src/Blumchen/Serialization/MessageUrnAttribute.cs b/src/Blumchen/Serialization/MessageUrnAttribute.cs index 8963e34..11ae685 100644 --- a/src/Blumchen/Serialization/MessageUrnAttribute.cs +++ b/src/Blumchen/Serialization/MessageUrnAttribute.cs @@ -1,7 +1,6 @@ using System.Collections.Concurrent; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member [AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)] public class MessageUrnAttribute: diff --git a/src/Blumchen/Subscriptions/IConsume.cs b/src/Blumchen/Subscriptions/IConsume.cs index 4a59c0f..6773455 100644 --- a/src/Blumchen/Subscriptions/IConsume.cs +++ b/src/Blumchen/Subscriptions/IConsume.cs @@ -1,5 +1,4 @@ namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IConsume; diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 64e6033..74750c1 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -3,7 +3,6 @@ using Npgsql; #pragma warning disable CA2208 -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace Blumchen.Subscriptions.Management; diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 4595dd4..dae42b6 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -5,7 +5,6 @@ namespace Blumchen.Subscriptions.Management; using static ReplicationSlotManagement.CreateReplicationSlotResult; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class ReplicationSlotManagement { diff --git a/src/Blumchen/Subscriptions/MimeType.cs b/src/Blumchen/Subscriptions/MimeType.cs index 8bb3ef9..1197908 100644 --- a/src/Blumchen/Subscriptions/MimeType.cs +++ b/src/Blumchen/Subscriptions/MimeType.cs @@ -1,6 +1,5 @@ namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 public abstract record MimeType(string mimeType) { public record Json(): MimeType("application/json"); diff --git a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs index 90b75d2..1f74c3a 100644 --- a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs +++ b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs @@ -3,7 +3,6 @@ using Npgsql.Replication.PgOutput.Messages; namespace Blumchen.Subscriptions.Replication; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IReplicationDataMapper { diff --git a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs b/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs index 099a627..aa2e812 100644 --- a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs +++ b/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs @@ -1,5 +1,4 @@ namespace Blumchen.Subscriptions.ReplicationMessageHandlers; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IEnvelope; diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs index 5603e79..eaaffe4 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -5,7 +5,6 @@ using Npgsql; namespace Blumchen.Subscriptions.SnapshotReader; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class SnapshotReader { diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 95854af..fa28b2e 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -12,7 +12,6 @@ using Npgsql.Replication.PgOutput.Messages; namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member using static PublicationManagement; using static ReplicationSlotManagement; diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 13a97d8..fb7e5be 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -5,7 +5,7 @@ using System.Text.Json.Serialization; namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + public sealed class SubscriptionOptionsBuilder { private static string? _connectionString; From ad74d3c5a78b46438580c2e7533a0364fb307386 Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 14:58:27 +0200 Subject: [PATCH 03/28] made UseTable() optional --- src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index fb7e5be..90cbdd0 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -90,9 +90,9 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce internal ISubscriptionOptions Build() { + _messageTable ??= TableDescriptorBuilder.Build(); ArgumentNullException.ThrowIfNull(_connectionString); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); - ArgumentNullException.ThrowIfNull(_messageTable); var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); foreach (var type in _registry.Keys) typeResolver.WhiteList(type); From b59d3869d1c4c6fa4c5938d128356d9732de815f Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 15:11:02 +0200 Subject: [PATCH 04/28] enforce AOT --- src/Subscriber/Subscriber.csproj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Subscriber/Subscriber.csproj b/src/Subscriber/Subscriber.csproj index d5d5273..7878764 100644 --- a/src/Subscriber/Subscriber.csproj +++ b/src/Subscriber/Subscriber.csproj @@ -5,7 +5,8 @@ net8.0 enable enable - true + True + true false From 71c1f1419c20783bca8384d6dd573475535487e7 Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 15:10:33 +0200 Subject: [PATCH 05/28] rename IConsumes to IHandles --- src/Blumchen/Subscriptions/IConsume.cs | 8 -------- src/Blumchen/Subscriptions/IHandler.cs | 8 ++++++++ src/Blumchen/Subscriptions/ISubscriptionOptions.cs | 4 ++-- src/Blumchen/Subscriptions/Subscription.cs | 12 ++++++------ .../Subscriptions/SubscriptionOptionsBuilder.cs | 10 +++++----- src/Subscriber/Program.cs | 8 ++++---- src/Tests/DatabaseFixture.cs | 8 ++++---- 7 files changed, 29 insertions(+), 29 deletions(-) delete mode 100644 src/Blumchen/Subscriptions/IConsume.cs create mode 100644 src/Blumchen/Subscriptions/IHandler.cs diff --git a/src/Blumchen/Subscriptions/IConsume.cs b/src/Blumchen/Subscriptions/IConsume.cs deleted file mode 100644 index 6773455..0000000 --- a/src/Blumchen/Subscriptions/IConsume.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Blumchen.Subscriptions; - -public interface IConsume; - -public interface IConsumes: IConsume where T : class -{ - Task Handle(T value); -} diff --git a/src/Blumchen/Subscriptions/IHandler.cs b/src/Blumchen/Subscriptions/IHandler.cs new file mode 100644 index 0000000..b722f25 --- /dev/null +++ b/src/Blumchen/Subscriptions/IHandler.cs @@ -0,0 +1,8 @@ +namespace Blumchen.Subscriptions; + +public interface IHandler; + +public interface IHandler: IHandler where T : class +{ + Task Handle(T value); +} diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 1be177c..79894e1 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -19,7 +19,7 @@ void Deconstruct( out ReplicationSlotSetupOptions replicationSlotSetupOptions, out IErrorProcessor errorProcessor, out IReplicationDataMapper dataMapper, - out Dictionary registry); + out Dictionary registry); } internal record SubscriptionOptions( @@ -28,4 +28,4 @@ internal record SubscriptionOptions( ReplicationSlotSetupOptions ReplicationOptions, IErrorProcessor ErrorProcessor, IReplicationDataMapper DataMapper, - Dictionary Registry): ISubscriptionOptions; + Dictionary Registry): ISubscriptionOptions; diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index fa28b2e..3c6ee7c 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -88,7 +88,7 @@ public async IAsyncEnumerable Subscribe( private static async IAsyncEnumerable ProcessEnvelope( IEnvelope envelope, - Dictionary registry, + Dictionary registry, IErrorProcessor errorProcessor ) where T:class { @@ -109,14 +109,14 @@ IErrorProcessor errorProcessor } } - private static readonly Dictionary Cache = []; + private static readonly Dictionary Cache = []; - private static (IConsume consumer, MethodInfo methodInfo) Memoize + private static (IHandler consumer, MethodInfo methodInfo) Memoize ( - Dictionary registry, + Dictionary registry, Type objType, - Func, Type, (IConsume consumer, MethodInfo methodInfo)> func + Func, Type, (IHandler consumer, MethodInfo methodInfo)> func ) { if (!Cache.TryGetValue(objType, out var entry)) @@ -124,7 +124,7 @@ private static (IConsume consumer, MethodInfo methodInfo) Memoize Cache[objType] = entry; return entry; } - private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) + private static (IHandler consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) { var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public); diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 90cbdd0..9b7e7c5 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -12,7 +12,7 @@ public sealed class SubscriptionOptionsBuilder private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; private static IReplicationDataMapper? _dataMapper; - private readonly Dictionary _registry = []; + private readonly Dictionary _registry = []; private IErrorProcessor? _errorProcessor; private INamingPolicy? _namingPolicy; private JsonSerializerContext? _jsonSerializerContext; @@ -74,10 +74,10 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme } [UsedImplicitly] - public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class - where TU : class, IConsumes + public SubscriptionOptionsBuilder Handles(TU handler) where T : class + where TU : class, IHandler { - _registry.TryAdd(typeof(T), consumer); + _registry.TryAdd(typeof(T), handler); return this; } @@ -119,7 +119,7 @@ static void Ensure(Func> evalFn, string formattedMsg) } } -public class ObjectTracingConsumer: IConsumes +public class ObjectTracingConsumer: IHandler { private static ulong _counter = 0; public Task Handle(object value) diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index c023ac0..ca08c10 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -30,8 +30,8 @@ ) .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) - .Consumes(consumer) - .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + .Handles(consumer) + .Handles(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); @@ -46,8 +46,8 @@ namespace Subscriber { internal class Consumer: - IConsumes, - IConsumes + IHandler, + IHandler { public Task Handle(UserCreatedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserCreatedContract)); public Task Handle(UserDeletedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserDeletedContract)); diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index fc71e14..9bc2e9a 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -17,7 +17,7 @@ public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime { protected ITestOutputHelper Output { get; } = output; protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2)); - protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class + protected class TestHandler(Action log, JsonTypeInfo info): IHandler where T : class { public async Task Handle(T value) { @@ -67,7 +67,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri await command.ExecuteNonQueryAsync(ct); } - protected (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + protected (TestHandler handler, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( string connectionString, string eventsTable, JsonSerializerContext info, @@ -78,13 +78,13 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri { var jsonTypeInfo = info.GetTypeInfo(typeof(T)); ArgumentNullException.ThrowIfNull(jsonTypeInfo); - var consumer = new TestConsumer(log, jsonTypeInfo); + var consumer = new TestHandler(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() .WithErrorProcessor(new TestOutErrorProcessor(Output)) .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) - .Consumes>(consumer) + .Handles>(consumer) .WithTable(o => o.Name(eventsTable)) .WithPublicationOptions( new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) From 86656eced34a2beb19ee4a1eff52835f998609fc Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 15:14:23 +0200 Subject: [PATCH 06/28] Use double quote --- .../Management/PublicationManagement.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 74750c1..c2b6f6c 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -69,14 +69,11 @@ internal static Task CreatePublication( ISet eventTypes, CancellationToken ct ) { + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} {{0}} WITH (publish = 'insert');"; return eventTypes.Count switch { - 0 => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", - ct - ), - _ => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WHERE ({PublicationFilter(eventTypes)}) WITH (publish = 'insert');", - ct - ) + 0 => Execute(dataSource, string.Format(sql,string.Empty), ct), + _ => Execute(dataSource, string.Format(sql, $"WHERE ({PublicationFilter(eventTypes)})"), ct) }; static string PublicationFilter(ICollection input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'")); } @@ -128,8 +125,7 @@ private static Task PublicationExists( this NpgsqlDataSource dataSource, string publicationName, CancellationToken ct - ) => - dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); + ) => dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); public abstract record SetupPublicationResult { From 438cf21997e6021342b7cf580ea6f8151352ad0d Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 15:15:13 +0200 Subject: [PATCH 07/28] use ILKE. Replication slot name is forced to lcase --- .../Management/ReplicationSlotManagement.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index dae42b6..adeb6a8 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -9,6 +9,12 @@ namespace Blumchen.Subscriptions.Management; public static class ReplicationSlotManagement { #pragma warning disable CA2208 + private static Task ReplicationSlotExists( + this NpgsqlDataSource dataSource, + string slotName, + CancellationToken ct + ) => dataSource.Exists("pg_replication_slots", "slot_name ILIKE $1", [slotName], ct); + public static async Task SetupReplicationSlot( this NpgsqlDataSource dataSource, LogicalReplicationConnection connection, @@ -53,18 +59,12 @@ static async Task Create( } } - private static Task ReplicationSlotExists( - this NpgsqlDataSource dataSource, - string slotName, - CancellationToken ct - ) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct); - public record ReplicationSlotSetupOptions( string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, - bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY + bool Binary = + false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY ); - public abstract record CreateReplicationSlotResult { public record None: CreateReplicationSlotResult; From b24c01f21feba7c1d41ad206992db6c914ea632b Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 15:15:37 +0200 Subject: [PATCH 08/28] unused directive --- src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index fd378d2..71e78e4 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -1,4 +1,3 @@ -using Blumchen; using Blumchen.Publications; using Blumchen.Serialization; using Blumchen.Subscriptions; From b024c6fe2161e1d532a418e92a2fe3bb5a14a686 Mon Sep 17 00:00:00 2001 From: giordanol Date: Thu, 4 Jul 2024 16:36:11 +0200 Subject: [PATCH 09/28] remove static variables to enable multiple instances on same process --- src/Blumchen/Subscriptions/Subscription.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 3c6ee7c..57dd724 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -25,8 +25,8 @@ public enum CreateStyle AlwaysRecreate, Never } - private static LogicalReplicationConnection? _connection; - private static readonly SubscriptionOptionsBuilder Builder = new(); + private LogicalReplicationConnection? _connection; + private readonly SubscriptionOptionsBuilder _builder = new(); private ISubscriptionOptions? _options; public async IAsyncEnumerable Subscribe( Func builder, @@ -34,7 +34,7 @@ public async IAsyncEnumerable Subscribe( [EnumeratorCancellation] CancellationToken ct = default ) { - _options = builder(Builder).Build(); + _options = builder(_builder).Build(); var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); dataSourceBuilder.UseLoggerFactory(loggerFactory); From 8cc1feddd39736e0d73d342f786fd2a5946942f5 Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 00:50:15 +0200 Subject: [PATCH 10/28] Added DependencyIbjection project --- Blumchen.sln | 6 ++ .../Blumchen.DependencyInjection.csproj | 49 ++++++++++++++ .../Configuration/DatabaseOptions.cs | 2 + .../Workers/ServiceCollectionExtensions.cs | 13 ++++ .../Workers/Worker.cs | 64 +++++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj create mode 100644 src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs create mode 100644 src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs create mode 100644 src/Blumchen.DependencyInjection/Workers/Worker.cs diff --git a/Blumchen.sln b/Blumchen.sln index a5020ed..6024139 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -43,6 +43,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen.DependencyInjection", "src\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj", "{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +71,10 @@ Global {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU + {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj new file mode 100644 index 0000000..a808d2d --- /dev/null +++ b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj @@ -0,0 +1,49 @@ + + + + 0.1.0 + net8.0 + true + true + true + false + true + true + true + true + 12.0 + Oskar Dudycz + + https://github.com/event-driven-io/Blumchen + MIT + https://github.com/event-driven-io/Blumchen.git + true + Blumchen + true + true + true + snupkg + Blumchen + true + enable + enable + + + + 1591 + + + + 1591 + + + + + + + + + + + + diff --git a/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs b/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs new file mode 100644 index 0000000..1be0098 --- /dev/null +++ b/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs @@ -0,0 +1,2 @@ +namespace Blumchen.Configuration; +public record DatabaseOptions(string ConnectionString); diff --git a/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs b/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..ec4e34c --- /dev/null +++ b/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.DependencyInjection; +#pragma warning disable IL2091 + +namespace Blumchen.Workers; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection AddBlumchen(this IServiceCollection service, T? instance = default) + where T : Worker where TU : class => + instance is null + ? service.AddHostedService() + : service.AddHostedService(_=>instance); +} diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs new file mode 100644 index 0000000..1d06a31 --- /dev/null +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -0,0 +1,64 @@ +using System.Collections.Concurrent; +using System.Text.Json.Serialization; +using Blumchen.Configuration; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Management; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Polly; + + +namespace Blumchen.Workers; + +public abstract class Worker( + DatabaseOptions databaseOptions, + IHandler handler, + JsonSerializerContext jsonSerializerContext, + IErrorProcessor errorProcessor, + ResiliencePipeline pipeline, + INamingPolicy namingPolicy, + PublicationManagement.PublicationSetupOptions publicationSetupOptions, + ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, + ILoggerFactory loggerFactory): BackgroundService where T : class +{ + private readonly ILogger> _logger = loggerFactory.CreateLogger>(); + private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; + private static readonly ConcurrentDictionary> _actions = new(StringComparer.OrdinalIgnoreCase); + private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) + { + static Action LoggerAction(LogLevel ll, bool enabled) => + (ll, enabled) switch + { + (LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), + (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), + (_, _) => (_, __, ___) => { } + }; + _actions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await pipeline.ExecuteAsync(async token => + { + await using var subscription = new Subscription(); + await using var cursor = subscription.Subscribe(builder => + builder + .ConnectionString(databaseOptions.ConnectionString) + .WithErrorProcessor(errorProcessor) + .Handles>(handler) + .NamingPolicy(namingPolicy) + .JsonContext(jsonSerializerContext) + .WithPublicationOptions(publicationSetupOptions) + .WithReplicationOptions(replicationSlotSetupOptions) + , ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token); + Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) + Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); + + }, stoppingToken).ConfigureAwait(false); + Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); + return; + } + +} From 380d06aed86fc5b832295553e153b00394b196c1 Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 00:52:04 +0200 Subject: [PATCH 11/28] Added demo projrct for DI --- Blumchen.sln | 7 +++ README.md | 2 +- src/SubscriberWorker/Contracts.cs | 22 ++++++++ src/SubscriberWorker/Handler.cs | 25 +++++++++ src/SubscriberWorker/Program.cs | 59 ++++++++++++++++++++ src/SubscriberWorker/SubscriberWorker.cs | 28 ++++++++++ src/SubscriberWorker/SubscriberWorker.csproj | 23 ++++++++ 7 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 src/SubscriberWorker/Contracts.cs create mode 100644 src/SubscriberWorker/Handler.cs create mode 100644 src/SubscriberWorker/Program.cs create mode 100644 src/SubscriberWorker/SubscriberWorker.cs create mode 100644 src/SubscriberWorker/SubscriberWorker.csproj diff --git a/Blumchen.sln b/Blumchen.sln index 6024139..fd99d48 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -45,6 +45,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-F EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen.DependencyInjection", "src\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj", "{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -75,6 +77,10 @@ Global {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.Build.0 = Debug|Any CPU {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.ActiveCfg = Release|Any CPU {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.Build.0 = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -85,6 +91,7 @@ Global {F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7} {C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} {8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} diff --git a/README.md b/README.md index 48856b2..36fd616 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr ```shell docker-compose up ``` -2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. +2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. ## Testing (against default docker instance) diff --git a/src/SubscriberWorker/Contracts.cs b/src/SubscriberWorker/Contracts.cs new file mode 100644 index 0000000..6fc3d0d --- /dev/null +++ b/src/SubscriberWorker/Contracts.cs @@ -0,0 +1,22 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; + +namespace SubscriberWorker +{ + [MessageUrn("user-created:v1")] + public record UserCreatedContract( + Guid Id, + string Name + ); + + [MessageUrn("user-deleted:v1")] + public record UserDeletedContract( + Guid Id, + string Name + ); + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(UserCreatedContract))] + [JsonSerializable(typeof(UserDeletedContract))] + internal partial class SourceGenerationContext: JsonSerializerContext; +} diff --git a/src/SubscriberWorker/Handler.cs b/src/SubscriberWorker/Handler.cs new file mode 100644 index 0000000..95c003c --- /dev/null +++ b/src/SubscriberWorker/Handler.cs @@ -0,0 +1,25 @@ +using Blumchen.Subscriptions; +using Microsoft.Extensions.Logging; +#pragma warning disable CS9113 // Parameter is unread. + +namespace SubscriberWorker; + + +public class Handler(ILoggerFactory loggerFactory): IHandler where T : class +{ + private readonly ILogger _logger = loggerFactory.CreateLogger>(); + private Task ReportSuccess(int count) + { + if(_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); + return Task.CompletedTask; + } + + private int _counter; + private int _completed; + public Task Handle(T value) + => Interlocked.Increment(ref _counter) % 10 == 0 + //Simulating some exception on out of process dependencies + ? Task.FromException(new Exception($"Error on publishing {nameof(T)}")) + : ReportSuccess(Interlocked.Increment(ref _completed)); +} diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs new file mode 100644 index 0000000..6f224e4 --- /dev/null +++ b/src/SubscriberWorker/Program.cs @@ -0,0 +1,59 @@ +using System.Text.Json.Serialization; +using Blumchen.Configuration; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Blumchen.Workers; +using Commons; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Polly.Retry; +using Polly; +using SubscriberWorker; + + +#pragma warning disable CS8601 // Possible null reference assignment. +Console.Title = typeof(Program).Assembly.GetName().Name; +#pragma warning restore CS8601 // Possible null reference assignment. + + + +AppDomain.CurrentDomain.UnhandledException += (_, e) => Console.Out.WriteLine(e.ExceptionObject.ToString()); +TaskScheduler.UnobservedTaskException += (_, e) => Console.Out.WriteLine(e.Exception.ToString()); + +var cancellationTokenSource = new CancellationTokenSource(); +var builder = Host.CreateApplicationBuilder(args); + +builder.Services + .AddBlumchen, UserCreatedContract>() + .AddSingleton, Handler>() + .AddBlumchen, UserDeletedContract>() + .AddSingleton, Handler>() + + .AddSingleton() + .AddSingleton() + .AddSingleton() + .AddSingleton(new DatabaseOptions(Settings.ConnectionString)) + .AddResiliencePipeline("default",(pipelineBuilder,context) => + pipelineBuilder + .AddRetry(new RetryStrategyOptions + { + BackoffType = DelayBackoffType.Constant, + Delay = TimeSpan.FromSeconds(5), + MaxRetryAttempts = int.MaxValue + }).Build()) + .AddLogging(loggingBuilder => + { + loggingBuilder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddFilter("Npgsql", LogLevel.Information) + .AddFilter("Blumchen", LogLevel.Debug) + .AddFilter("SubscriberWorker", LogLevel.Debug) + .AddSimpleConsole(); + }); + +await builder + .Build() + .RunAsync(cancellationTokenSource.Token) + .ConfigureAwait(false); diff --git a/src/SubscriberWorker/SubscriberWorker.cs b/src/SubscriberWorker/SubscriberWorker.cs new file mode 100644 index 0000000..5b97440 --- /dev/null +++ b/src/SubscriberWorker/SubscriberWorker.cs @@ -0,0 +1,28 @@ +using System.Text.Json.Serialization; +using Blumchen.Configuration; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Management; +using Blumchen.Workers; +using Microsoft.Extensions.Logging; +using Polly.Registry; +// ReSharper disable ClassNeverInstantiated.Global + +namespace SubscriberWorker; +public class SubscriberWorker( + DatabaseOptions databaseOptions, + IHandler handler, + JsonSerializerContext jsonSerializerContext, + ResiliencePipelineProvider pipelineProvider, + INamingPolicy namingPolicy, + IErrorProcessor errorProcessor, + ILoggerFactory loggerFactory +): Worker(databaseOptions + , handler + , jsonSerializerContext + , errorProcessor + , pipelineProvider.GetPipeline("default") + , namingPolicy + , new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub") + , new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot") + , loggerFactory) where T : class; diff --git a/src/SubscriberWorker/SubscriberWorker.csproj b/src/SubscriberWorker/SubscriberWorker.csproj new file mode 100644 index 0000000..cfe99a0 --- /dev/null +++ b/src/SubscriberWorker/SubscriberWorker.csproj @@ -0,0 +1,23 @@ + + + + Exe + net8.0 + enable + enable + true + true + false + + + + + + + + + + + + + From acf5f4dcd1ed1410f2a2324fe9f805820fec66cb Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 10:22:49 +0200 Subject: [PATCH 12/28] Enhanced logging --- src/Publisher/Program.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index dae2987..adc4a9a 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -22,7 +22,7 @@ if (line != null && int.TryParse(line, out var result)) { var cts = new CancellationTokenSource(); - + var messages = result / 3; var ct = cts.Token; var connection = new NpgsqlConnection(Settings.ConnectionString); await using var connection1 = connection.ConfigureAwait(false); @@ -36,6 +36,9 @@ 1 => new UserDeleted(Guid.NewGuid()), _ => new UserModified(Guid.NewGuid()) }); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 0) ? 1 : 0)} {nameof(UserCreated)}"); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 1) ? 1 : 0)} {nameof(UserDeleted)}"); + await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserModified)}"); foreach (var @event in @events) { var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); @@ -63,6 +66,7 @@ throw; } } + await Console.Out.WriteLineAsync($"Published {result} messages!"); } //use a batch command //{ From e918dfaca1bc6f94b28134e5b3b566370f176c4e Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 16:10:32 +0200 Subject: [PATCH 13/28] bumped version to 0.1.1 --- .../Blumchen.DependencyInjection.csproj | 2 +- src/Blumchen/Blumchen.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj index a808d2d..5b9e0e5 100644 --- a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj +++ b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj @@ -1,7 +1,7 @@ - 0.1.0 + 0.1.1 net8.0 true true diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 0f26919..72bdb54 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -1,7 +1,7 @@ - 0.1.0 + 0.1.1 net8.0 true true From 2055fede86e05e100261830ee849076b529c4a23 Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 18:52:59 +0200 Subject: [PATCH 14/28] allow tableDescriptor access --- src/Blumchen.DependencyInjection/Workers/Worker.cs | 2 ++ src/Blumchen/MessageTableOptions.cs | 2 ++ src/SubscriberWorker/SubscriberWorker.cs | 1 + 3 files changed, 5 insertions(+) diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index 1d06a31..ba45bb7 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -20,6 +20,7 @@ public abstract class Worker( INamingPolicy namingPolicy, PublicationManagement.PublicationSetupOptions publicationSetupOptions, ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, + Func tableDescriptorBuilder, ILoggerFactory loggerFactory): BackgroundService where T : class { private readonly ILogger> _logger = loggerFactory.CreateLogger>(); @@ -45,6 +46,7 @@ await pipeline.ExecuteAsync(async token => await using var cursor = subscription.Subscribe(builder => builder .ConnectionString(databaseOptions.ConnectionString) + .WithTable(tableDescriptorBuilder) .WithErrorProcessor(errorProcessor) .Handles>(handler) .NamingPolicy(namingPolicy) diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/MessageTableOptions.cs index 7216108..b038de3 100644 --- a/src/Blumchen/MessageTableOptions.cs +++ b/src/Blumchen/MessageTableOptions.cs @@ -33,6 +33,8 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250) return this; } + public TableDescriptorBuilder UseDefaults() => this; + public record MessageTable(string Name = MessageTable.DefaultName) { internal const string DefaultName = "outbox"; diff --git a/src/SubscriberWorker/SubscriberWorker.cs b/src/SubscriberWorker/SubscriberWorker.cs index 5b97440..d6c4a67 100644 --- a/src/SubscriberWorker/SubscriberWorker.cs +++ b/src/SubscriberWorker/SubscriberWorker.cs @@ -25,4 +25,5 @@ ILoggerFactory loggerFactory , namingPolicy , new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub") , new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot") + , tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults() , loggerFactory) where T : class; From eadba73a6878176e00bec7340e1719790b7398f8 Mon Sep 17 00:00:00 2001 From: giordanol Date: Fri, 5 Jul 2024 18:58:09 +0200 Subject: [PATCH 15/28] renamed vars --- src/Blumchen.DependencyInjection/Workers/Worker.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index ba45bb7..8e147fc 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -25,7 +25,7 @@ public abstract class Worker( { private readonly ILogger> _logger = loggerFactory.CreateLogger>(); private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; - private static readonly ConcurrentDictionary> _actions = new(StringComparer.OrdinalIgnoreCase); + private static readonly ConcurrentDictionary> LoggingActions = new(StringComparer.OrdinalIgnoreCase); private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) { static Action LoggerAction(LogLevel ll, bool enabled) => @@ -35,7 +35,7 @@ static Action LoggerAction(LogLevel ll, bool enabled) (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), (_, _) => (_, __, ___) => { } }; - _actions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); + LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) From c097047d399b2af0b7c0fe8d5964cdaaa5f392ec Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 15 Jul 2024 13:54:45 +0200 Subject: [PATCH 16/28] expose NpgsqlDataSource along with connection string - close #16 --- .../Configuration/DatabaseOptions.cs | 2 -- .../Workers/Worker.cs | 23 +++++++++--------- src/Blumchen/Serialization/ITypeResolver.cs | 4 ++-- .../Subscriptions/ISubscriptionOptions.cs | 10 +++++--- src/Blumchen/Subscriptions/Subscription.cs | 11 +++------ .../SubscriptionOptionsBuilder.cs | 21 ++++++++++++---- src/Subscriber/Program.cs | 6 ++++- src/SubscriberWorker/Program.cs | 24 ++++++++++--------- src/SubscriberWorker/SubscriberWorker.cs | 12 ++++++---- src/Tests/DatabaseFixture.cs | 1 + src/Tests/When_Subscription_Already_Exists.cs | 2 +- ...ption_Does_Not_Exist_And_Table_Is_Empty.cs | 2 +- ...n_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 2 +- 13 files changed, 69 insertions(+), 51 deletions(-) delete mode 100644 src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs diff --git a/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs b/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs deleted file mode 100644 index 1be0098..0000000 --- a/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs +++ /dev/null @@ -1,2 +0,0 @@ -namespace Blumchen.Configuration; -public record DatabaseOptions(string ConnectionString); diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index 8e147fc..6a07118 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -1,18 +1,19 @@ using System.Collections.Concurrent; using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Npgsql; using Polly; namespace Blumchen.Workers; public abstract class Worker( - DatabaseOptions databaseOptions, + NpgsqlDataSource dataSource, + string connectionString, IHandler handler, JsonSerializerContext jsonSerializerContext, IErrorProcessor errorProcessor, @@ -21,9 +22,8 @@ public abstract class Worker( PublicationManagement.PublicationSetupOptions publicationSetupOptions, ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, Func tableDescriptorBuilder, - ILoggerFactory loggerFactory): BackgroundService where T : class + ILogger logger): BackgroundService where T : class { - private readonly ILogger> _logger = loggerFactory.CreateLogger>(); private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; private static readonly ConcurrentDictionary> LoggingActions = new(StringComparer.OrdinalIgnoreCase); private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) @@ -33,9 +33,9 @@ static Action LoggerAction(LogLevel ll, bool enabled) { (LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), - (_, _) => (_, __, ___) => { } + (_, _) => (_, _, _) => { } }; - LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); + LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -45,7 +45,8 @@ await pipeline.ExecuteAsync(async token => await using var subscription = new Subscription(); await using var cursor = subscription.Subscribe(builder => builder - .ConnectionString(databaseOptions.ConnectionString) + .DataSource(dataSource) + .ConnectionString(connectionString) .WithTable(tableDescriptorBuilder) .WithErrorProcessor(errorProcessor) .Handles>(handler) @@ -53,13 +54,13 @@ await pipeline.ExecuteAsync(async token => .JsonContext(jsonSerializerContext) .WithPublicationOptions(publicationSetupOptions) .WithReplicationOptions(replicationSlotSetupOptions) - , ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token); - Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName); + , ct: token).GetAsyncEnumerator(token); + Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) - Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); + Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); }, stoppingToken).ConfigureAwait(false); - Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); + Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); return; } diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 71e49d3..cdddbf4 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -24,8 +24,8 @@ internal sealed class JsonTypeResolver( internal void WhiteList(Type type) { var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); - _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); - _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); + _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type); + _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo); } public (string, JsonTypeInfo) Resolve(Type type) => diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 79894e1..83a383c 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -1,5 +1,6 @@ using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using static Blumchen.Subscriptions.Management.PublicationManagement; using static Blumchen.Subscriptions.Management.ReplicationSlotManagement; @@ -7,14 +8,16 @@ namespace Blumchen.Subscriptions; internal interface ISubscriptionOptions { - [UsedImplicitly] string ConnectionString { get; } + [UsedImplicitly] NpgsqlDataSource DataSource { get; } + [UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; } IReplicationDataMapper DataMapper { get; } [UsedImplicitly] PublicationSetupOptions PublicationOptions { get; } [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } void Deconstruct( - out string connectionString, + out NpgsqlDataSource dataSource, + out NpgsqlConnectionStringBuilder connectionStringBuilder, out PublicationSetupOptions publicationSetupOptions, out ReplicationSlotSetupOptions replicationSlotSetupOptions, out IErrorProcessor errorProcessor, @@ -23,7 +26,8 @@ void Deconstruct( } internal record SubscriptionOptions( - string ConnectionString, + NpgsqlDataSource DataSource, + NpgsqlConnectionStringBuilder ConnectionStringBuilder, PublicationSetupOptions PublicationOptions, ReplicationSlotSetupOptions ReplicationOptions, IErrorProcessor ErrorProcessor, diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 57dd724..a06adcb 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -30,22 +30,17 @@ public enum CreateStyle private ISubscriptionOptions? _options; public async IAsyncEnumerable Subscribe( Func builder, - ILoggerFactory? loggerFactory = null, [EnumeratorCancellation] CancellationToken ct = default ) { _options = builder(_builder).Build(); - var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; - var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); - dataSourceBuilder.UseLoggerFactory(loggerFactory); - - var dataSource = dataSourceBuilder.Build(); + var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; + await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); - _connection = new LogicalReplicationConnection(connectionString); + _connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString); await _connection.Open(ct).ConfigureAwait(false); - await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false); diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 9b7e7c5..8c4365b 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -2,13 +2,15 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using System.Text.Json.Serialization; namespace Blumchen.Subscriptions; public sealed class SubscriptionOptionsBuilder { - private static string? _connectionString; + private static NpgsqlConnectionStringBuilder? _connectionStringBuilder; + private static NpgsqlDataSource? _dataSource; private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; private static IReplicationDataMapper? _dataMapper; @@ -22,7 +24,7 @@ public sealed class SubscriptionOptionsBuilder static SubscriptionOptionsBuilder() { - _connectionString = null; + _connectionStringBuilder = default; _publicationSetupOptions = new(); _replicationSlotSetupOptions = default; _dataMapper = default; @@ -40,7 +42,14 @@ public SubscriptionOptionsBuilder WithTable( [UsedImplicitly] public SubscriptionOptionsBuilder ConnectionString(string connectionString) { - _connectionString = connectionString; + _connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString); + return this; + } + + [UsedImplicitly] + public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource) + { + _dataSource = dataSource; return this; } @@ -91,7 +100,8 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce internal ISubscriptionOptions Build() { _messageTable ??= TableDescriptorBuilder.Build(); - ArgumentNullException.ThrowIfNull(_connectionString); + ArgumentNullException.ThrowIfNull(_connectionStringBuilder); + ArgumentNullException.ThrowIfNull(_dataSource); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); @@ -104,7 +114,8 @@ internal ISubscriptionOptions Build() if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); return new SubscriptionOptions( - _connectionString, + _dataSource, + _connectionStringBuilder, _publicationSetupOptions, _replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), _errorProcessor ?? new ConsoleOutErrorProcessor(), diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index ca08c10..eabee73 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -2,6 +2,7 @@ using Blumchen.Subscriptions; using Commons; using Microsoft.Extensions.Logging; +using Npgsql; using Subscriber; #pragma warning disable CS8601 // Possible null reference assignment. @@ -20,8 +21,11 @@ try { + var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole())); var cursor = subscription.Subscribe( builder => builder + .DataSource(dataSourceBuilder.Build()) .ConnectionString(Settings.ConnectionString) .WithTable(options => options .Id("id") @@ -31,7 +35,7 @@ .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) .Handles(consumer) - .Handles(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + .Handles(consumer), ct:ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 6f224e4..3349b40 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -1,5 +1,4 @@ using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Workers; @@ -10,6 +9,7 @@ using Polly.Retry; using Polly; using SubscriberWorker; +using Npgsql; #pragma warning disable CS8601 // Possible null reference assignment. @@ -29,19 +29,21 @@ .AddSingleton, Handler>() .AddBlumchen, UserDeletedContract>() .AddSingleton, Handler>() - + .AddSingleton(Settings.ConnectionString) + .AddTransient(sp => + new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(sp.GetRequiredService()).Build()) .AddSingleton() .AddSingleton() .AddSingleton() - .AddSingleton(new DatabaseOptions(Settings.ConnectionString)) - .AddResiliencePipeline("default",(pipelineBuilder,context) => + .AddResiliencePipeline("default", (pipelineBuilder, _) => pipelineBuilder - .AddRetry(new RetryStrategyOptions - { - BackoffType = DelayBackoffType.Constant, - Delay = TimeSpan.FromSeconds(5), - MaxRetryAttempts = int.MaxValue - }).Build()) + .AddRetry(new RetryStrategyOptions + { + BackoffType = DelayBackoffType.Constant, + Delay = TimeSpan.FromSeconds(5), + MaxRetryAttempts = int.MaxValue + }).Build()) .AddLogging(loggingBuilder => { loggingBuilder @@ -51,7 +53,7 @@ .AddFilter("Blumchen", LogLevel.Debug) .AddFilter("SubscriberWorker", LogLevel.Debug) .AddSimpleConsole(); - }); + }).AddSingleton(sp => sp.GetRequiredService().CreateLogger()); await builder .Build() diff --git a/src/SubscriberWorker/SubscriberWorker.cs b/src/SubscriberWorker/SubscriberWorker.cs index d6c4a67..cb28195 100644 --- a/src/SubscriberWorker/SubscriberWorker.cs +++ b/src/SubscriberWorker/SubscriberWorker.cs @@ -1,23 +1,25 @@ using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; using Blumchen.Workers; using Microsoft.Extensions.Logging; +using Npgsql; using Polly.Registry; // ReSharper disable ClassNeverInstantiated.Global namespace SubscriberWorker; public class SubscriberWorker( - DatabaseOptions databaseOptions, + NpgsqlDataSource dataSource, + string connectionString, IHandler handler, JsonSerializerContext jsonSerializerContext, ResiliencePipelineProvider pipelineProvider, INamingPolicy namingPolicy, IErrorProcessor errorProcessor, - ILoggerFactory loggerFactory -): Worker(databaseOptions + ILogger logger +): Worker(dataSource + , connectionString , handler , jsonSerializerContext , errorProcessor @@ -26,4 +28,4 @@ ILoggerFactory loggerFactory , new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub") , new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot") , tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults() - , loggerFactory) where T : class; + , logger) where T : class; diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 9bc2e9a..19d0af5 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -81,6 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri var consumer = new TestHandler(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() .WithErrorProcessor(new TestOutErrorProcessor(Output)) + .DataSource(new NpgsqlDataSourceBuilder(connectionString).Build()) .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 4575b69..71b78c9 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -46,7 +46,7 @@ await MessageAppender.AppendAsync( var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 71e78e4..3a52e23 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -38,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index 6422396..d1029fb 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; From ef9ac954e253c316e97cc32bd7e8a6d6325b1740 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 15 Jul 2024 14:56:40 +0200 Subject: [PATCH 17/28] rename extension method --- src/Blumchen.DependencyInjection/Workers/Worker.cs | 2 +- src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs | 2 +- src/Subscriber/Program.cs | 4 ++-- src/Tests/DatabaseFixture.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index 6a07118..2d69ccd 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -49,7 +49,7 @@ await pipeline.ExecuteAsync(async token => .ConnectionString(connectionString) .WithTable(tableDescriptorBuilder) .WithErrorProcessor(errorProcessor) - .Handles>(handler) + .Consumes>(handler) .NamingPolicy(namingPolicy) .JsonContext(jsonSerializerContext) .WithPublicationOptions(publicationSetupOptions) diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 8c4365b..cf39401 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -83,7 +83,7 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme } [UsedImplicitly] - public SubscriptionOptionsBuilder Handles(TU handler) where T : class + public SubscriptionOptionsBuilder Consumes(TU handler) where T : class where TU : class, IHandler { _registry.TryAdd(typeof(T), handler); diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index eabee73..8c8e057 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -34,8 +34,8 @@ ) .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) - .Handles(consumer) - .Handles(consumer), ct:ct + .Consumes(consumer) + .Consumes(consumer), ct:ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 19d0af5..78e1525 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -85,7 +85,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) - .Handles>(consumer) + .Consumes>(consumer) .WithTable(o => o.Name(eventsTable)) .WithPublicationOptions( new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) From 345cad6f1b0f67075b5eeb9523ea86c8f1b45b51 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 15 Jul 2024 15:01:19 +0200 Subject: [PATCH 18/28] move to files --- src/Blumchen/Subscriptions/IErrorProcessor.cs | 11 ++++++++++ .../Subscriptions/ObjectTracingConsumer.cs | 11 ++++++++++ .../SubscriptionOptionsBuilder.cs | 20 ------------------- src/SubscriberWorker/Handler.cs | 7 +++---- 4 files changed, 25 insertions(+), 24 deletions(-) create mode 100644 src/Blumchen/Subscriptions/IErrorProcessor.cs create mode 100644 src/Blumchen/Subscriptions/ObjectTracingConsumer.cs diff --git a/src/Blumchen/Subscriptions/IErrorProcessor.cs b/src/Blumchen/Subscriptions/IErrorProcessor.cs new file mode 100644 index 0000000..b3cec9d --- /dev/null +++ b/src/Blumchen/Subscriptions/IErrorProcessor.cs @@ -0,0 +1,11 @@ +namespace Blumchen.Subscriptions; + +public interface IErrorProcessor +{ + Func Process { get; } +} + +public record ConsoleOutErrorProcessor: IErrorProcessor +{ + public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); +} diff --git a/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs b/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs new file mode 100644 index 0000000..3263482 --- /dev/null +++ b/src/Blumchen/Subscriptions/ObjectTracingConsumer.cs @@ -0,0 +1,11 @@ +namespace Blumchen.Subscriptions; + +internal class ObjectTracingConsumer: IHandler +{ + private static ulong _counter = 0; + public Task Handle(object value) + { + Interlocked.Increment(ref _counter); + return Console.Out.WriteLineAsync(); + } +} diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index cf39401..22162a1 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -129,23 +129,3 @@ static void Ensure(Func> evalFn, string formattedMsg) } } - -public class ObjectTracingConsumer: IHandler -{ - private static ulong _counter = 0; - public Task Handle(object value) - { - Interlocked.Increment(ref _counter); - return Console.Out.WriteLineAsync(); - } -} - -public interface IErrorProcessor -{ - Func Process { get; } -} - -public record ConsoleOutErrorProcessor: IErrorProcessor -{ - public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); -} diff --git a/src/SubscriberWorker/Handler.cs b/src/SubscriberWorker/Handler.cs index 95c003c..070fc8c 100644 --- a/src/SubscriberWorker/Handler.cs +++ b/src/SubscriberWorker/Handler.cs @@ -5,13 +5,12 @@ namespace SubscriberWorker; -public class Handler(ILoggerFactory loggerFactory): IHandler where T : class +public class Handler(ILogger logger): IHandler where T : class { - private readonly ILogger _logger = loggerFactory.CreateLogger>(); private Task ReportSuccess(int count) { - if(_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); + if(logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); return Task.CompletedTask; } From 645ec1a0bb7c4f487068250c033d3a169a704a79 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 16 Jul 2024 19:13:29 +0200 Subject: [PATCH 19/28] simplified di registration --- .../Blumchen.DependencyInjection.csproj | 5 ++ .../Workers/ServiceCollectionExtensions.cs | 21 +++++-- .../Workers/Worker.cs | 38 ++----------- .../Workers/WorkerOptionsBuilder.cs | 37 +++++++++++++ src/Blumchen/Blumchen.csproj | 3 + .../Subscriptions/ISubscriptionOptions.cs | 2 +- src/Blumchen/Subscriptions/Subscription.cs | 20 +++++-- .../SubscriptionOptionsBuilder.cs | 31 ++++------- src/Publisher/Contracts.cs | 8 ++- src/Publisher/Program.cs | 14 +++-- src/Subscriber/Program.cs | 4 +- src/SubscriberWorker/Contracts.cs | 7 +++ src/SubscriberWorker/Handler.cs | 29 +++++++--- src/SubscriberWorker/Program.cs | 55 ++++++++++++++++--- src/SubscriberWorker/SubscriberWorker.cs | 31 ----------- src/Tests/DatabaseFixture.cs | 2 +- 16 files changed, 186 insertions(+), 121 deletions(-) create mode 100644 src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs delete mode 100644 src/SubscriberWorker/SubscriberWorker.cs diff --git a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj index 5b9e0e5..bb407ba 100644 --- a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj +++ b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj @@ -38,6 +38,11 @@ + + all + none + all + diff --git a/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs b/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs index ec4e34c..0c8a6a0 100644 --- a/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs +++ b/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs @@ -1,13 +1,24 @@ +using Blumchen.Subscriptions; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Polly; + #pragma warning disable IL2091 namespace Blumchen.Workers; public static class ServiceCollectionExtensions { - public static IServiceCollection AddBlumchen(this IServiceCollection service, T? instance = default) - where T : Worker where TU : class => - instance is null - ? service.AddHostedService() - : service.AddHostedService(_=>instance); + + public static IServiceCollection AddBlumchen( + this IServiceCollection service, + Func workerOptions) + where T : class, IHandler => + service + .AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build()) + .AddHostedService(provider => + new Worker(workerOptions(provider, new WorkerOptionsBuilder()).Build(), + provider.GetRequiredService>>())); + + } diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index 2d69ccd..dbca462 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -1,28 +1,13 @@ using System.Collections.Concurrent; -using System.Text.Json.Serialization; -using Blumchen.Serialization; using Blumchen.Subscriptions; -using Blumchen.Subscriptions.Management; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Npgsql; -using Polly; - namespace Blumchen.Workers; -public abstract class Worker( - NpgsqlDataSource dataSource, - string connectionString, - IHandler handler, - JsonSerializerContext jsonSerializerContext, - IErrorProcessor errorProcessor, - ResiliencePipeline pipeline, - INamingPolicy namingPolicy, - PublicationManagement.PublicationSetupOptions publicationSetupOptions, - ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, - Func tableDescriptorBuilder, - ILogger logger): BackgroundService where T : class +public class Worker( + WorkerOptions options, + ILogger> logger): BackgroundService where T : class, IHandler { private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; private static readonly ConcurrentDictionary> LoggingActions = new(StringComparer.OrdinalIgnoreCase); @@ -40,28 +25,17 @@ static Action LoggerAction(LogLevel ll, bool enabled) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await pipeline.ExecuteAsync(async token => + await options.ResiliencePipeline.ExecuteAsync(async token => { await using var subscription = new Subscription(); - await using var cursor = subscription.Subscribe(builder => - builder - .DataSource(dataSource) - .ConnectionString(connectionString) - .WithTable(tableDescriptorBuilder) - .WithErrorProcessor(errorProcessor) - .Consumes>(handler) - .NamingPolicy(namingPolicy) - .JsonContext(jsonSerializerContext) - .WithPublicationOptions(publicationSetupOptions) - .WithReplicationOptions(replicationSlotSetupOptions) - , ct: token).GetAsyncEnumerator(token); + await using var cursor = subscription.Subscribe(options.SubscriptionOptions, ct: token) + .GetAsyncEnumerator(token); Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); }, stoppingToken).ConfigureAwait(false); Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); - return; } } diff --git a/src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs b/src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs new file mode 100644 index 0000000..f146b75 --- /dev/null +++ b/src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs @@ -0,0 +1,37 @@ +using Blumchen.Subscriptions; +using Polly; + +namespace Blumchen.Workers; + +public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriptionOptions SubscriptionOptions); + +public interface IWorkerOptionsBuilder +{ + IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline); + IWorkerOptionsBuilder Subscription(Func? builder); + WorkerOptions Build(); +} + +internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder +{ + private ResiliencePipeline? _resiliencePipeline = default; + private Func? _builder; + + public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline) + { + _resiliencePipeline = resiliencePipeline; + return this; + }public IWorkerOptionsBuilder Subscription(Func? builder) + { + _builder = builder; + return this; + } + + public WorkerOptions Build() + { + ArgumentNullException.ThrowIfNull(_resiliencePipeline); + ArgumentNullException.ThrowIfNull(_builder); + return new(_resiliencePipeline, _builder(new SubscriptionOptionsBuilder()).Build()); + } +} + diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 72bdb54..9847f66 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -39,6 +39,9 @@ <_Parameter1>Tests + + <_Parameter1>Blumchen.DependencyInjection + diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 83a383c..27dd513 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -6,7 +6,7 @@ namespace Blumchen.Subscriptions; -internal interface ISubscriptionOptions +public interface ISubscriptionOptions { [UsedImplicitly] NpgsqlDataSource DataSource { get; } [UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; } diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index a06adcb..872df0b 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -5,7 +5,6 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.ReplicationMessageHandlers; using Blumchen.Subscriptions.SnapshotReader; -using Microsoft.Extensions.Logging; using Npgsql; using Npgsql.Replication; using Npgsql.Replication.PgOutput; @@ -33,9 +32,18 @@ public async IAsyncEnumerable Subscribe( [EnumeratorCancellation] CancellationToken ct = default ) { - _options = builder(_builder).Build(); - var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; - + await foreach (var _ in Subscribe(builder(_builder).Build(), ct)) + yield return _; + } + + internal async IAsyncEnumerable Subscribe( + ISubscriptionOptions subscriptionOptions, + [EnumeratorCancellation] CancellationToken ct = default + ) + { + _options = subscriptionOptions; + var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = subscriptionOptions; + await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); _connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString); @@ -60,8 +68,8 @@ public async IAsyncEnumerable Subscribe( ); await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false)) - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) - yield return subscribe; + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) + yield return subscribe; } await foreach (var message in diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 22162a1..8166ab2 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -9,33 +9,23 @@ namespace Blumchen.Subscriptions; public sealed class SubscriptionOptionsBuilder { - private static NpgsqlConnectionStringBuilder? _connectionStringBuilder; - private static NpgsqlDataSource? _dataSource; - private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; - private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; - private static IReplicationDataMapper? _dataMapper; + private NpgsqlConnectionStringBuilder? _connectionStringBuilder; + private NpgsqlDataSource? _dataSource; + private PublicationManagement.PublicationSetupOptions _publicationSetupOptions = new(); + private ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; + private IReplicationDataMapper? _dataMapper; private readonly Dictionary _registry = []; private IErrorProcessor? _errorProcessor; private INamingPolicy? _namingPolicy; private JsonSerializerContext? _jsonSerializerContext; - private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); + private readonly TableDescriptorBuilder _tableDescriptorBuilder = new(); private TableDescriptorBuilder.MessageTable? _messageTable; - - - static SubscriptionOptionsBuilder() - { - _connectionStringBuilder = default; - _publicationSetupOptions = new(); - _replicationSlotSetupOptions = default; - _dataMapper = default; - } - [UsedImplicitly] public SubscriptionOptionsBuilder WithTable( Func builder) { - _messageTable = builder(TableDescriptorBuilder).Build(); + _messageTable = builder(_tableDescriptorBuilder).Build(); return this; } @@ -83,8 +73,7 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme } [UsedImplicitly] - public SubscriptionOptionsBuilder Consumes(TU handler) where T : class - where TU : class, IHandler + public SubscriptionOptionsBuilder Consumes(IHandler handler) where T : class { _registry.TryAdd(typeof(T), handler); return this; @@ -99,7 +88,7 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce internal ISubscriptionOptions Build() { - _messageTable ??= TableDescriptorBuilder.Build(); + _messageTable ??= _tableDescriptorBuilder.Build(); ArgumentNullException.ThrowIfNull(_connectionStringBuilder); ArgumentNullException.ThrowIfNull(_dataSource); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); @@ -121,11 +110,11 @@ internal ISubscriptionOptions Build() _errorProcessor ?? new ConsoleOutErrorProcessor(), _dataMapper, _registry); + static void Ensure(Func> evalFn, string formattedMsg) { var misses = evalFn().ToArray(); if (misses.Length > 0) throw new Exception(string.Format(formattedMsg, string.Join(", ", misses.Select(t => $"'{t.Name}'")))); } - } } diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs index e9995e9..8a10b46 100644 --- a/src/Publisher/Contracts.cs +++ b/src/Publisher/Contracts.cs @@ -16,15 +16,21 @@ internal record UserDeleted( string Name = "Deleted" ): IContract; -[MessageUrn("user-modified:v1")] //subscription ignored +[MessageUrn("user-modified:v1")] internal record UserModified( Guid Id, string Name = "Modified" ): IContract; +[MessageUrn("user-subscribed:v1")] +internal record UserSubscribed( + Guid Id, + string Name = "Subscribed" +): IContract; [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreated))] [JsonSerializable(typeof(UserDeleted))] [JsonSerializable(typeof(UserModified))] +[JsonSerializable(typeof(UserSubscribed))] internal partial class SourceGenerationContext: JsonSerializerContext; diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index adc4a9a..41da1a0 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -6,6 +6,7 @@ using UserCreated = Publisher.UserCreated; using UserDeleted = Publisher.UserDeleted; using UserModified = Publisher.UserModified; +using UserSubscribed = Publisher.UserSubscribed; Console.Title = typeof(Program).Assembly.GetName().Name!; Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); @@ -22,7 +23,7 @@ if (line != null && int.TryParse(line, out var result)) { var cts = new CancellationTokenSource(); - var messages = result / 3; + var messages = result / 4; var ct = cts.Token; var connection = new NpgsqlConnection(Settings.ConnectionString); await using var connection1 = connection.ConfigureAwait(false); @@ -30,15 +31,17 @@ //use a command for each message { var @events = Enumerable.Range(0, result).Select(i => - (i % 3) switch + (i % 4) switch { 0 => new UserCreated(Guid.NewGuid()) as object, 1 => new UserDeleted(Guid.NewGuid()), - _ => new UserModified(Guid.NewGuid()) + 2 => new UserModified(Guid.NewGuid()), + _ => new UserSubscribed(Guid.NewGuid()) }); await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 0) ? 1 : 0)} {nameof(UserCreated)}"); await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 1) ? 1 : 0)} {nameof(UserDeleted)}"); - await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserModified)}"); + await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 2) ? 1 : 0)} {nameof(UserModified)}"); + await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserSubscribed)}"); foreach (var @event in @events) { var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); @@ -55,6 +58,9 @@ case UserModified m: await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); break; + case UserSubscribed m: + await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); + break; } await transaction.CommitAsync(ct).ConfigureAwait(false); diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 8c8e057..1d129f8 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -34,8 +34,8 @@ ) .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) - .Consumes(consumer) - .Consumes(consumer), ct:ct + .Consumes(consumer) + .Consumes(consumer), ct:ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); diff --git a/src/SubscriberWorker/Contracts.cs b/src/SubscriberWorker/Contracts.cs index 6fc3d0d..a3c501a 100644 --- a/src/SubscriberWorker/Contracts.cs +++ b/src/SubscriberWorker/Contracts.cs @@ -15,8 +15,15 @@ public record UserDeletedContract( string Name ); + [MessageUrn("user-modified:v1")] //subscription ignored + public record UserModifiedContract( + Guid Id, + string Name = "Modified" + ); + [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreatedContract))] [JsonSerializable(typeof(UserDeletedContract))] + [JsonSerializable(typeof(UserModifiedContract))] internal partial class SourceGenerationContext: JsonSerializerContext; } diff --git a/src/SubscriberWorker/Handler.cs b/src/SubscriberWorker/Handler.cs index 070fc8c..3ecf3fb 100644 --- a/src/SubscriberWorker/Handler.cs +++ b/src/SubscriberWorker/Handler.cs @@ -4,21 +4,34 @@ namespace SubscriberWorker; - -public class Handler(ILogger logger): IHandler where T : class +public class HandlerBase(ILogger logger) { - private Task ReportSuccess(int count) + private Task ReportSuccess(int count) { - if(logger.IsEnabled(LogLevel.Debug)) + if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); return Task.CompletedTask; } private int _counter; private int _completed; - public Task Handle(T value) - => Interlocked.Increment(ref _counter) % 10 == 0 + protected Task Handle(T value) => + Interlocked.Increment(ref _counter) % 10 == 0 //Simulating some exception on out of process dependencies - ? Task.FromException(new Exception($"Error on publishing {nameof(T)}")) - : ReportSuccess(Interlocked.Increment(ref _completed)); + ? Task.FromException(new Exception($"Error on publishing {typeof(T).FullName}")) + : ReportSuccess(Interlocked.Increment(ref _completed)); +} + +public class HandleImpl2(ILogger logger): HandlerBase(logger), + IHandler +{ + public Task Handle(UserDeletedContract value) => Handle(value); +} + +public class HandleImpl1(ILogger logger) : HandlerBase(logger), + IHandler, IHandler +{ + public Task Handle(UserCreatedContract value) => Handle(value); + + public Task Handle(UserModifiedContract value) => Handle(value); } diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 3349b40..33a5188 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -10,6 +10,8 @@ using Polly; using SubscriberWorker; using Npgsql; +using Blumchen.Subscriptions.Management; +using Polly.Registry; #pragma warning disable CS8601 // Possible null reference assignment. @@ -25,17 +27,16 @@ var builder = Host.CreateApplicationBuilder(args); builder.Services - .AddBlumchen, UserCreatedContract>() - .AddSingleton, Handler>() - .AddBlumchen, UserDeletedContract>() - .AddSingleton, Handler>() - .AddSingleton(Settings.ConnectionString) - .AddTransient(sp => - new NpgsqlDataSourceBuilder(Settings.ConnectionString) - .UseLoggerFactory(sp.GetRequiredService()).Build()) + + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl2>() + .AddSingleton() + .AddSingleton() .AddSingleton() + .AddResiliencePipeline("default", (pipelineBuilder, _) => pipelineBuilder .AddRetry(new RetryStrategyOptions @@ -44,6 +45,7 @@ Delay = TimeSpan.FromSeconds(5), MaxRetryAttempts = int.MaxValue }).Build()) + .AddLogging(loggingBuilder => { loggingBuilder @@ -53,7 +55,42 @@ .AddFilter("Blumchen", LogLevel.Debug) .AddFilter("SubscriberWorker", LogLevel.Debug) .AddSimpleConsole(); - }).AddSingleton(sp => sp.GetRequiredService().CreateLogger()); + }) + .AddTransient(sp => + new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(sp.GetRequiredService()).Build()) + + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions + .ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{nameof(HandleImpl1)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationSetupOptions($"{nameof(HandleImpl1)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + .NamingPolicy(provider.GetRequiredService()) + .JsonContext(SourceGenerationContext.Default) + .Consumes(provider.GetRequiredService>()) + .Consumes(provider.GetRequiredService>())) + .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) + + ) + + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions.ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{nameof(HandleImpl2)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationSetupOptions($"{nameof(HandleImpl2)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + .NamingPolicy(provider.GetRequiredService()) + .JsonContext(SourceGenerationContext.Default) + .Consumes(provider.GetRequiredService>())) + .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) + ) + ; await builder .Build() diff --git a/src/SubscriberWorker/SubscriberWorker.cs b/src/SubscriberWorker/SubscriberWorker.cs deleted file mode 100644 index cb28195..0000000 --- a/src/SubscriberWorker/SubscriberWorker.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System.Text.Json.Serialization; -using Blumchen.Serialization; -using Blumchen.Subscriptions; -using Blumchen.Subscriptions.Management; -using Blumchen.Workers; -using Microsoft.Extensions.Logging; -using Npgsql; -using Polly.Registry; -// ReSharper disable ClassNeverInstantiated.Global - -namespace SubscriberWorker; -public class SubscriberWorker( - NpgsqlDataSource dataSource, - string connectionString, - IHandler handler, - JsonSerializerContext jsonSerializerContext, - ResiliencePipelineProvider pipelineProvider, - INamingPolicy namingPolicy, - IErrorProcessor errorProcessor, - ILogger logger -): Worker(dataSource - , connectionString - , handler - , jsonSerializerContext - , errorProcessor - , pipelineProvider.GetPipeline("default") - , namingPolicy - , new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub") - , new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot") - , tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults() - , logger) where T : class; diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 78e1525..1b2f1af 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -85,7 +85,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) - .Consumes>(consumer) + .Consumes(consumer) .WithTable(o => o.Name(eventsTable)) .WithPublicationOptions( new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) From ca28054932dd18a6b3abd30e308aad82994edb9f Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 16 Jul 2024 19:28:05 +0200 Subject: [PATCH 20/28] collapse project --- Blumchen.sln | 6 ------ .../Blumchen.DependencyInjection.csproj | 2 +- src/Blumchen/Blumchen.csproj | 4 +++- .../Workers/ServiceCollectionExtensions.cs | 3 +-- .../Workers/Worker.cs | 0 .../Workers/WorkerOptionsBuilder.cs | 0 src/SubscriberWorker/Program.cs | 2 +- src/SubscriberWorker/SubscriberWorker.csproj | 1 - 8 files changed, 6 insertions(+), 12 deletions(-) rename src/{Blumchen.DependencyInjection => Blumchen}/Workers/ServiceCollectionExtensions.cs (87%) rename src/{Blumchen.DependencyInjection => Blumchen}/Workers/Worker.cs (100%) rename src/{Blumchen.DependencyInjection => Blumchen}/Workers/WorkerOptionsBuilder.cs (100%) diff --git a/Blumchen.sln b/Blumchen.sln index fd99d48..f8510ef 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -43,8 +43,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen.DependencyInjection", "src\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj", "{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}" EndProject Global @@ -73,10 +71,6 @@ Global {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU - {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.Build.0 = Release|Any CPU {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj index bb407ba..5c7b408 100644 --- a/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj +++ b/src/Blumchen.DependencyInjection/Blumchen.DependencyInjection.csproj @@ -38,7 +38,7 @@ - + all none all diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 9847f66..0138080 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -45,13 +45,15 @@ - + all none all + + diff --git a/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs b/src/Blumchen/Workers/ServiceCollectionExtensions.cs similarity index 87% rename from src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs rename to src/Blumchen/Workers/ServiceCollectionExtensions.cs index 0c8a6a0..6950b04 100644 --- a/src/Blumchen.DependencyInjection/Workers/ServiceCollectionExtensions.cs +++ b/src/Blumchen/Workers/ServiceCollectionExtensions.cs @@ -1,7 +1,6 @@ using Blumchen.Subscriptions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Polly; #pragma warning disable IL2091 @@ -18,7 +17,7 @@ public static IServiceCollection AddBlumchen( .AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build()) .AddHostedService(provider => new Worker(workerOptions(provider, new WorkerOptionsBuilder()).Build(), - provider.GetRequiredService>>())); + ServiceProviderServiceExtensions.GetRequiredService>>(provider))); } diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen/Workers/Worker.cs similarity index 100% rename from src/Blumchen.DependencyInjection/Workers/Worker.cs rename to src/Blumchen/Workers/Worker.cs diff --git a/src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs b/src/Blumchen/Workers/WorkerOptionsBuilder.cs similarity index 100% rename from src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs rename to src/Blumchen/Workers/WorkerOptionsBuilder.cs diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 33a5188..735b15d 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -1,7 +1,6 @@ using System.Text.Json.Serialization; using Blumchen.Serialization; using Blumchen.Subscriptions; -using Blumchen.Workers; using Commons; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -11,6 +10,7 @@ using SubscriberWorker; using Npgsql; using Blumchen.Subscriptions.Management; +using Blumchen.Workers; using Polly.Registry; diff --git a/src/SubscriberWorker/SubscriberWorker.csproj b/src/SubscriberWorker/SubscriberWorker.csproj index cfe99a0..ad5c3f6 100644 --- a/src/SubscriberWorker/SubscriberWorker.csproj +++ b/src/SubscriberWorker/SubscriberWorker.csproj @@ -16,7 +16,6 @@ - From f4cfe0cf8a844345696b802f31ee93d6df48a980 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 16 Jul 2024 19:28:25 +0200 Subject: [PATCH 21/28] rename folder --- .../ServiceCollectionExtensions.cs | 2 +- src/Blumchen/{Workers => DependencyInjection}/Worker.cs | 2 +- .../{Workers => DependencyInjection}/WorkerOptionsBuilder.cs | 2 +- src/SubscriberWorker/Program.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename src/Blumchen/{Workers => DependencyInjection}/ServiceCollectionExtensions.cs (95%) rename src/Blumchen/{Workers => DependencyInjection}/Worker.cs (98%) rename src/Blumchen/{Workers => DependencyInjection}/WorkerOptionsBuilder.cs (96%) diff --git a/src/Blumchen/Workers/ServiceCollectionExtensions.cs b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs similarity index 95% rename from src/Blumchen/Workers/ServiceCollectionExtensions.cs rename to src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs index 6950b04..c01d5ab 100644 --- a/src/Blumchen/Workers/ServiceCollectionExtensions.cs +++ b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs @@ -4,7 +4,7 @@ #pragma warning disable IL2091 -namespace Blumchen.Workers; +namespace Blumchen.DependencyInjection; public static class ServiceCollectionExtensions { diff --git a/src/Blumchen/Workers/Worker.cs b/src/Blumchen/DependencyInjection/Worker.cs similarity index 98% rename from src/Blumchen/Workers/Worker.cs rename to src/Blumchen/DependencyInjection/Worker.cs index dbca462..4f137b7 100644 --- a/src/Blumchen/Workers/Worker.cs +++ b/src/Blumchen/DependencyInjection/Worker.cs @@ -3,7 +3,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -namespace Blumchen.Workers; +namespace Blumchen.DependencyInjection; public class Worker( WorkerOptions options, diff --git a/src/Blumchen/Workers/WorkerOptionsBuilder.cs b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs similarity index 96% rename from src/Blumchen/Workers/WorkerOptionsBuilder.cs rename to src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs index f146b75..07c79c5 100644 --- a/src/Blumchen/Workers/WorkerOptionsBuilder.cs +++ b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs @@ -1,7 +1,7 @@ using Blumchen.Subscriptions; using Polly; -namespace Blumchen.Workers; +namespace Blumchen.DependencyInjection; public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriptionOptions SubscriptionOptions); diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 735b15d..2c13d81 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -1,4 +1,5 @@ using System.Text.Json.Serialization; +using Blumchen.DependencyInjection; using Blumchen.Serialization; using Blumchen.Subscriptions; using Commons; @@ -10,7 +11,6 @@ using SubscriberWorker; using Npgsql; using Blumchen.Subscriptions.Management; -using Blumchen.Workers; using Polly.Registry; From f9bf48f7c57240506f700b653d731b6aa15695f0 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:10:26 +0200 Subject: [PATCH 22/28] mark as implicit usage --- src/Blumchen/MessageTableOptions.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/MessageTableOptions.cs index b038de3..ec91b9b 100644 --- a/src/Blumchen/MessageTableOptions.cs +++ b/src/Blumchen/MessageTableOptions.cs @@ -1,4 +1,5 @@ using Blumchen.Subscriptions; +using JetBrains.Annotations; using NpgsqlTypes; namespace Blumchen; @@ -33,6 +34,7 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250) return this; } + [UsedImplicitly] public TableDescriptorBuilder UseDefaults() => this; public record MessageTable(string Name = MessageTable.DefaultName) From 7fff1b4f8dbef35b6893c4e4a6e3b1223ebadba8 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:11:46 +0200 Subject: [PATCH 23/28] switch to prepared statement --- src/Blumchen/Publications/MessageAppender.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index c927dfe..4c4054f 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -35,11 +35,13 @@ private static async Task AppendAsyncOfT(T input { var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T)); var data = JsonSerialization.ToJson(@input, jsonTypeInfo); - var command = new NpgsqlCommand( + + await using var command = new NpgsqlCommand( $"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')", connection, transaction ); + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } From f0c41607e5e976841d5a66e9df17db9cd1acc3a8 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:13:43 +0200 Subject: [PATCH 24/28] dispose resources --- src/Blumchen/Publications/MessageAppender.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index 4c4054f..66b7fdd 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -55,12 +55,12 @@ public static async Task AppendAsync(T input var (typeName, jsonTypeInfo) = options.resolver.Resolve(type); var data = JsonSerialization.ToJson(input, jsonTypeInfo); - var connection = new NpgsqlConnection(connectionString); - await using var connection1 = connection.ConfigureAwait(false); + await using var connection = new NpgsqlConnection(connectionString); await connection.OpenAsync(ct).ConfigureAwait(false); - var command = connection.CreateCommand(); + await using var command = connection.CreateCommand(); command.CommandText = $"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } @@ -71,7 +71,7 @@ private static async Task AppendBatchAsyncOfT(T inputs , NpgsqlTransaction transaction , CancellationToken ct) where T : class, IEnumerable { - var batch = new NpgsqlBatch(connection, transaction); + await using var batch = new NpgsqlBatch(connection, transaction); foreach (var input in inputs) { var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType()); From 8db4187b21d63b94724c7d67b06264bd2fb0ca9a Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:21:59 +0200 Subject: [PATCH 25/28] explicit defaults --- src/Publisher/Program.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index 41da1a0..a29d944 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -14,6 +14,7 @@ var resolver = new PublisherSetupOptionsBuilder() .JsonContext(SourceGenerationContext.Default) .NamingPolicy(new AttributeNamingPolicy()) + .WithTable(builder => builder.UseDefaults())//default, but explicit .Build(); do From c446721f3f68c3c4d9a90f41df052c5fe010a9f2 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:23:23 +0200 Subject: [PATCH 26/28] add PublisherOptions --- src/Blumchen/Publications/MessageAppender.cs | 12 ++++++------ src/Blumchen/Publications/PublisherOptions.cs | 5 +++++ .../Publications/PublisherSetupOptionsBuilder.cs | 4 ++-- 3 files changed, 13 insertions(+), 8 deletions(-) create mode 100644 src/Blumchen/Publications/PublisherOptions.cs diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index 66b7fdd..860d80b 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -7,7 +7,7 @@ namespace Blumchen.Publications; public static class MessageAppender { public static async Task AppendAsync(T @input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver + , PublisherOptions resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct @@ -18,10 +18,10 @@ public static async Task AppendAsync(T @input case null: throw new ArgumentNullException(nameof(@input)); case IEnumerable inputs: - await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; default: - await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; } } @@ -46,20 +46,20 @@ private static async Task AppendAsyncOfT(T input } public static async Task AppendAsync(T input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options + , PublisherOptions options , string connectionString , CancellationToken ct) where T: class { var type = typeof(T); - var (typeName, jsonTypeInfo) = options.resolver.Resolve(type); + var (typeName, jsonTypeInfo) = options.JsonTypeResolver.Resolve(type); var data = JsonSerialization.ToJson(input, jsonTypeInfo); await using var connection = new NpgsqlConnection(connectionString); await connection.OpenAsync(ct).ConfigureAwait(false); await using var command = connection.CreateCommand(); command.CommandText = - $"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + $"INSERT INTO {options.TableDescriptor.Name}({options.TableDescriptor.MessageType.Name}, {options.TableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } diff --git a/src/Blumchen/Publications/PublisherOptions.cs b/src/Blumchen/Publications/PublisherOptions.cs new file mode 100644 index 0000000..d6bdeed --- /dev/null +++ b/src/Blumchen/Publications/PublisherOptions.cs @@ -0,0 +1,5 @@ +using Blumchen.Serialization; + +namespace Blumchen.Publications; + +public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver); diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs index dbd7095..e42c4a2 100644 --- a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs +++ b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs @@ -33,7 +33,7 @@ public PublisherSetupOptionsBuilder WithTable(Func Date: Wed, 17 Jul 2024 10:45:19 +0200 Subject: [PATCH 27/28] Expose shortcut for table validation when bootstrapping publisher --- src/Blumchen/Publications/PublisherOptions.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/Blumchen/Publications/PublisherOptions.cs b/src/Blumchen/Publications/PublisherOptions.cs index d6bdeed..d25c048 100644 --- a/src/Blumchen/Publications/PublisherOptions.cs +++ b/src/Blumchen/Publications/PublisherOptions.cs @@ -1,5 +1,20 @@ +using Blumchen.Database; using Blumchen.Serialization; +using Npgsql; namespace Blumchen.Publications; public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver); + +public static class PublisherOptionsExtensions +{ + public static async Task EnsureTable(this PublisherOptions publisherOptions, NpgsqlDataSource dataSource, CancellationToken ct) + { + await dataSource.EnsureTableExists(publisherOptions.TableDescriptor, ct); + return publisherOptions; + } + + public static Task EnsureTable(this PublisherOptions publisherOptions, + string connectionString, CancellationToken ct) + => EnsureTable(publisherOptions, new NpgsqlDataSourceBuilder(connectionString).Build(), ct); +} From 2351af18bccd92b84de778aa49a06788bc8e68a5 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 17 Jul 2024 10:46:25 +0200 Subject: [PATCH 28/28] explain different available to publisher for validating table --- src/Publisher/Program.cs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index a29d944..cdbccc9 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -1,3 +1,4 @@ +using Blumchen.Database; using Blumchen.Publications; using Blumchen.Serialization; using Commons; @@ -10,12 +11,7 @@ Console.Title = typeof(Program).Assembly.GetName().Name!; Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); - -var resolver = new PublisherSetupOptionsBuilder() - .JsonContext(SourceGenerationContext.Default) - .NamingPolicy(new AttributeNamingPolicy()) - .WithTable(builder => builder.UseDefaults())//default, but explicit - .Build(); +var cts = new CancellationTokenSource(); do { @@ -23,7 +19,19 @@ var line = Console.ReadLine(); if (line != null && int.TryParse(line, out var result)) { - var cts = new CancellationTokenSource(); + var resolver = await new PublisherSetupOptionsBuilder() + .JsonContext(SourceGenerationContext.Default) + .NamingPolicy(new AttributeNamingPolicy()) + .WithTable(builder => builder.UseDefaults()) //default, but explicit + .Build() + .EnsureTable(Settings.ConnectionString, cts.Token)//enforce table existence and conformity - db roundtrip + .ConfigureAwait(false); + + //Or you might want to verify at a later stage + await new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .Build() + .EnsureTableExists(resolver.TableDescriptor, cts.Token).ConfigureAwait(false); + var messages = result / 4; var ct = cts.Token; var connection = new NpgsqlConnection(Settings.ConnectionString);