diff --git a/Blumchen.sln b/Blumchen.sln index a5020ed..f2618e8 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -36,13 +36,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "pgAdmin", "pgAdmin", "{C050 docker\pgAdmin\servers.json = docker\pgAdmin\servers.json EndProjectSection EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8AAAA344-B5FD-48D9-B2BA-379E374448D4}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UnitTests", "src\UnitTests\UnitTests.csproj", "{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "scripts", "scripts", "{13543764-C1E1-482E-887A-816ABA4CB71C}" ProjectSection(SolutionItems) = preProject - docker\postgres\init.sql = docker\postgres\init.sql + scripts\autoheal.ps1 = scripts\autoheal.ps1 EndProjectSection EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +73,14 @@ Global {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU + {B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -78,7 +90,7 @@ Global {F2878625-0919-4C26-8DC9-58CD8FA34050} = {A4044484-FE08-4399-8239-14AABFA30AD7} {F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7} {C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} - {8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} + {DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} diff --git a/README.md b/README.md index 48856b2..36fd616 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr ```shell docker-compose up ``` -2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. +2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. ## Testing (against default docker instance) diff --git a/docker-compose.yml b/docker-compose.yml index e34c14e..7cbbf38 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ version: '3.8' # Specify Docker Compose version services: postgres: image: postgres:15.1-alpine + container_name: db ports: - "5432:5432" environment: @@ -15,9 +16,8 @@ services: - "wal_level=logical" - "-c" - "wal_compression=on" - volumes: - - ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql - + - "-c" + - "max_slot_wal_keep_size=1" pgadmin: container_name: pgadmin_container image: dpage/pgadmin4 @@ -36,4 +36,3 @@ services: depends_on: - postgres restart: unless-stopped - diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql deleted file mode 100644 index 5887ecd..0000000 --- a/docker/postgres/init.sql +++ /dev/null @@ -1,6 +0,0 @@ - -CREATE TABLE outbox ( - id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - message_type VARCHAR(250) NOT NULL, - data JSONB NOT NULL -); diff --git a/scripts/autoheal.ps1 b/scripts/autoheal.ps1 new file mode 100644 index 0000000..bf78150 --- /dev/null +++ b/scripts/autoheal.ps1 @@ -0,0 +1,76 @@ +#usage: .\scripts\autoheal.ps1 -SolutionFile .\Blumchen.sln -ComposeFile .\docker-compose.yml +param( + [string]$SolutionFile, + [string]$ComposeFile +) + + +$env:DOCKER_CLI_HINTS=$false #disable docker hints +Write-Host "Setup infrastrucure" + +try { + + start powershell { + docker compose up; + Read-Host; + + } + + Write-Host "Waiting for container readiness..." + do + { + Start-Sleep -s 5 + $state=$(docker inspect db|ConvertFrom-Json).State + $status=$state.Status + $exitCode=$state.ExitCode + $restart=$state.Restarting + }Until(($status -eq "running") -and ($exitCode -eq 0) -and ($restart -eq $false)) + + Write-Host "...Done" + + Write-Host "Start subscriber" + start powershell { + dotnet run --project ./src/SubscriberWorker/SubscriberWorker.csproj + Read-Host; + } + + Write-Host "Publishing 10 messages to test the subscriptions are working properly: hit ENTER when done!" + + start powershell { + dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\"; + } + + Read-Host; + + Write-Host "Start massive insert to force wal segment creation..." + start powershell { + dotnet run --project ./src/Publisher/Publisher.csproj -- -c 800000 -t "UserSubscribed" + } + + Write-Host "Wait for subscribers to auto heal on error...reporting on row insert" + + Start-Sleep -s 15 + do + { + docker exec -it db psql -h localhost -U postgres -w -c "select count(*) from outbox;" + }Until(Read-Host "Enter to report on counting rows(another key to proceed when done)" "") + + Write-Host "Subscribers resiliency tested :-)" + Write-Host "Publishing 10 messages to test the subscriptions are still working properly: hit ENTER when done!" + + start powershell { + dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\" + } + Read-Host; + + Write-Host "We're done...: hit ENTER to shut down!" + + Read-Host; + +}catch { + Write-Host "An error occurred:" + Write-Host $_ +} +finally{ + docker compose -f $ComposeFile down --rmi local +} diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 4e6b52c..89c220c 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -1,7 +1,7 @@ - + - 0.1.0 + 0.1.1 net8.0 true true @@ -25,20 +25,39 @@ snupkg Blumchen + + + 1591 + + + + + 1591 + + <_Parameter1>Tests + + <_Parameter1>UnitTests + + + <_Parameter1>Blumchen.DependencyInjection + - + + all none all + + diff --git a/src/Blumchen/ConfigurationException.cs b/src/Blumchen/ConfigurationException.cs new file mode 100644 index 0000000..93a24f7 --- /dev/null +++ b/src/Blumchen/ConfigurationException.cs @@ -0,0 +1,3 @@ +namespace Blumchen; + +public class ConfigurationException(string message): Exception(message); diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 4e03b17..6ffc326 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -1,11 +1,8 @@ using System.Data; using System.Runtime.CompilerServices; using Blumchen.Subscriptions.Replication; -using Blumchen.Subscriptions.ReplicationMessageHandlers; using Npgsql; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - namespace Blumchen.Database; public static class Run @@ -16,11 +13,11 @@ private static async Task Execute( CancellationToken ct) { await using var command = dataSource.CreateCommand(sql); - await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); + await command.ExecuteNonQueryAsync(ct); } public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, TableDescriptorBuilder.MessageTable tableDescriptor, CancellationToken ct) - => await dataSource.Execute(tableDescriptor.ToString(), ct).ConfigureAwait(false); + => await dataSource.Execute(string.Concat("select pg_advisory_xact_lock(12345);", tableDescriptor), ct).ConfigureAwait(false); public static async Task Exists( this NpgsqlDataSource dataSource, diff --git a/src/Blumchen/DependencyInjection/LoggerExtensions.cs b/src/Blumchen/DependencyInjection/LoggerExtensions.cs new file mode 100644 index 0000000..721690f --- /dev/null +++ b/src/Blumchen/DependencyInjection/LoggerExtensions.cs @@ -0,0 +1,18 @@ +using Blumchen.Subscriptions.Replication; +using Microsoft.Extensions.Logging; + +namespace Blumchen.DependencyInjection; + +internal static partial class LoggerExtensions + +{ + [LoggerMessage(Message = "{workerName} started", Level = LogLevel.Information)] + public static partial void ServiceStarted(this ILogger logger, string workerName); + + [LoggerMessage(Message = "{workerName} sopped", Level = LogLevel.Information)] + public static partial void ServiceStopped(this ILogger logger, string workerName); + + [LoggerMessage(Message = "{message} processed", Level = LogLevel.Trace)] + public static partial void MessageProcessed(this ILogger logger, IEnvelope message); + +} diff --git a/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..daf6ac3 --- /dev/null +++ b/src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs @@ -0,0 +1,41 @@ +using Blumchen.Subscriber; +using Blumchen.Subscriptions.Replication; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Npgsql; + +#pragma warning disable IL2091 + +namespace Blumchen.DependencyInjection; + +public static class ServiceCollectionExtensions +{ + + public static IServiceCollection AddBlumchen( + this IServiceCollection service, + Func workerOptions) + where T : class, IMessageHandler => + service + .AddHostedService(provider => + new Worker(workerOptions(provider, new WorkerOptionsBuilder()).Build(), + provider.GetRequiredService>>())); + + public static IServiceCollection AddBlumchen( + this IServiceCollection service, + string connectionString, + Func consumerFn) where T : class, IMessageHandler { + return service + .AddHostedService(provider => + new Worker(MinimalWorkerOptions(provider, new WorkerOptionsBuilder()).Build(), + provider.GetService>>() ?? new NullLogger>())); + + IWorkerOptionsBuilder MinimalWorkerOptions(IServiceProvider provider, IWorkerOptionsBuilder builder) + => builder.Subscription(optionsBuilder => consumerFn(provider, optionsBuilder) + .ConnectionString(connectionString) + .DataSource(new NpgsqlDataSourceBuilder(connectionString) + .UseLoggerFactory(provider.GetService()).Build())); + + + } +} diff --git a/src/Blumchen/DependencyInjection/Worker.cs b/src/Blumchen/DependencyInjection/Worker.cs new file mode 100644 index 0000000..3732620 --- /dev/null +++ b/src/Blumchen/DependencyInjection/Worker.cs @@ -0,0 +1,28 @@ +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Replication; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Blumchen.DependencyInjection; + +public class Worker( + WorkerOptions options, + ILogger> logger): BackgroundService where T : class, IMessageHandler +{ + private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await options.OuterPipeline.ExecuteAsync(async token => + await options.InnerPipeline.ExecuteAsync(async ct => + { + await using var subscription = new Subscription(); + await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct) + .GetAsyncEnumerator(ct); + logger.ServiceStarted(WorkerName); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) + logger.MessageProcessed(cursor.Current); + }, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false); + logger.ServiceStopped(WorkerName); + } + +} diff --git a/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs new file mode 100644 index 0000000..45bb272 --- /dev/null +++ b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs @@ -0,0 +1,67 @@ +using System.Diagnostics.CodeAnalysis; +using Blumchen.Subscriber; +using Blumchen.Subscriptions.Management; +using Npgsql; +using Npgsql.Replication; +using Polly; + +namespace Blumchen.DependencyInjection; + +public record WorkerOptions( + ISubscriberOptions SubscriberOptions, + ResiliencePipeline OuterPipeline, + ResiliencePipeline InnerPipeline); + +public interface IWorkerOptionsBuilder +{ + IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline); + IWorkerOptionsBuilder Subscription(Func? builder); + WorkerOptions Build(); + IWorkerOptionsBuilder EnableSubscriptionAutoHeal(); +} + +internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder +{ + private ResiliencePipeline? _outerPipeline; + private Func? _innerPipelineFn; + private Func? _builder; + + public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline) + { + _outerPipeline = resiliencePipeline; + return this; + }public IWorkerOptionsBuilder Subscription(Func? builder) + { + _builder = builder; + return this; + } + + public WorkerOptions Build() + { + ArgumentNullException.ThrowIfNull(_builder); + var subscriberOptions = _builder(new OptionsBuilder()).Build(); + return new(subscriberOptions, _outerPipeline ?? ResiliencePipeline.Empty, + _innerPipelineFn?.Invoke(subscriberOptions.ReplicationOptions.SlotName,subscriberOptions.ConnectionStringBuilder.ConnectionString) ?? + ResiliencePipeline.Empty + ); + } + + public IWorkerOptionsBuilder EnableSubscriptionAutoHeal() + { + _innerPipelineFn = (replicationSlotName, connectionString) => new ResiliencePipelineBuilder().AddRetry(new() + { + ShouldHandle = + new PredicateBuilder().Handle(exception => + exception.SqlState.Equals("55000", StringComparison.OrdinalIgnoreCase)), + MaxRetryAttempts = int.MaxValue, + OnRetry = async args => + { + await using var conn = new LogicalReplicationConnection(connectionString); + await conn.Open(args.Context.CancellationToken); + await conn.ReCreate(replicationSlotName, args.Context.CancellationToken).ConfigureAwait(false); + }, + }).Build(); + return this; + } +} + diff --git a/src/Blumchen/Ensure.cs b/src/Blumchen/Ensure.cs new file mode 100644 index 0000000..7ecafb4 --- /dev/null +++ b/src/Blumchen/Ensure.cs @@ -0,0 +1,49 @@ +using System.Collections; +using Blumchen.Serialization; +using Blumchen.Subscriber; +using ImTools; + +namespace Blumchen; + +internal static class Ensure +{ + internal const string CannotBeMixedWithOtherConsumingStrategies = "`{0}` cannot be mixed with other consuming strategies"; + + internal const string RawRoutedByUrnErrorFormat = $"`{nameof(RawRoutedByUrnAttribute)}` missing on `{{0}}` message type"; + + public static void RawUrn(T value, params object?[] parameters) => new RawUrnTrait().IsValid(value, parameters); + public static void Null(T value, string parameters) => new NullTrait().IsValid(value, parameters); + public static void NotNull(T value, string parameters) => new NotNullTrait().IsValid(value, parameters); + public static void NotEmpty(T value, string parameters) => new NotEmptyTrait().IsValid(value, parameters); + public static void Empty(T value, string parameters) => new EmptyTrait().IsValid(value, parameters); + public static void And(Validable left, T v1, Validable right, TU v2, params object?[] parameters) => + _ = left.IsValid(v1, parameters) && right.IsValid(v2, parameters); +} + +internal abstract record Validable(Func Condition, string ErrorFormat) +{ + public bool IsValid(T value, params object?[] parameters) + { + if (!Condition(value)) + throw new ConfigurationException(string.Format(ErrorFormat, parameters)); + return true; + } +} + +internal record RawUrnTrait(): Validable(v => v is ICollection { Count: > 0 }, + Ensure.RawRoutedByUrnErrorFormat); + +internal record NullTrait() + : Validable(v => v is null, $"`{{0}}` method on {nameof(OptionsBuilder)} called more then once"); + +internal record NotNullTrait() + : Validable(v => v is not null, $"`{{0}}` method not called on {nameof(OptionsBuilder)}"); + +internal record NotEmptyTrait(): Validable(v => v is ICollection { Count: > 0 }, + $"No `{{0}}` method called on {nameof(OptionsBuilder)}"); + +internal record EmptyTrait() + : Validable(v => v is ICollection { Count: 0 }, Ensure.CannotBeMixedWithOtherConsumingStrategies); + +internal record BoolTrait(Func Condition, string ErrorFormat) + : Validable(Condition, ErrorFormat); diff --git a/src/Blumchen/IDictionaryExtensions.cs b/src/Blumchen/IDictionaryExtensions.cs new file mode 100644 index 0000000..999f8a9 --- /dev/null +++ b/src/Blumchen/IDictionaryExtensions.cs @@ -0,0 +1,12 @@ +namespace Blumchen; + +internal static class IDictionaryExtensions +{ + public static TR? FindByMultiKey(this IDictionary registry, params T[] parameters) where T : class + { + if (parameters.Length == 0) return default; + return registry.TryGetValue(parameters[0], out var value) + ? value + : FindByMultiKey(registry, parameters[1..parameters.Length]); + } +} diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publisher/MessageAppender.cs similarity index 56% rename from src/Blumchen/Publications/MessageAppender.cs rename to src/Blumchen/Publisher/MessageAppender.cs index 623c8b4..bdb8ccb 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publisher/MessageAppender.cs @@ -1,14 +1,34 @@ using System.Collections; +using System.Text.Json.Serialization.Metadata; using Blumchen.Serialization; using Npgsql; -namespace Blumchen.Publications; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member +namespace Blumchen.Publisher; public static class MessageAppender { + public static async Task AppendAsync(object @input + , PublisherOptions resolver + , NpgsqlConnection connection + , NpgsqlTransaction transaction + , CancellationToken ct + ) + { + switch (@input) + { + case null: + throw new ArgumentNullException(nameof(@input)); + case IEnumerable inputs: + await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + break; + default: + await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + break; + } + } + public static async Task AppendAsync(T @input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver + , PublisherOptions resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct @@ -19,58 +39,60 @@ public static async Task AppendAsync(T @input case null: throw new ArgumentNullException(nameof(@input)); case IEnumerable inputs: - await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; default: - await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); + await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; } } private static async Task AppendAsyncOfT(T input , TableDescriptorBuilder.MessageTable tableDescriptor - , IJsonTypeResolver typeResolver + , ITypeResolver typeResolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct) where T : class { - var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T)); + var (typeName, jsonTypeInfo) = typeResolver.Resolve(input.GetType()); var data = JsonSerialization.ToJson(@input, jsonTypeInfo); - var command = new NpgsqlCommand( + + await using var command = new NpgsqlCommand( $"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')", connection, transaction ); + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } public static async Task AppendAsync(T input - , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options + , PublisherOptions options , string connectionString , CancellationToken ct) where T: class { var type = typeof(T); - var (typeName, jsonTypeInfo) = options.resolver.Resolve(type); + var (typeName, jsonTypeInfo) = options.JsonTypeResolver.Resolve(type); var data = JsonSerialization.ToJson(input, jsonTypeInfo); - var connection = new NpgsqlConnection(connectionString); - await using var connection1 = connection.ConfigureAwait(false); + await using var connection = new NpgsqlConnection(connectionString); await connection.OpenAsync(ct).ConfigureAwait(false); - var command = connection.CreateCommand(); + await using var command = connection.CreateCommand(); command.CommandText = - $"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + $"INSERT INTO {options.TableDescriptor.Name}({options.TableDescriptor.MessageType.Name}, {options.TableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + await command.PrepareAsync(ct).ConfigureAwait(false); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } private static async Task AppendBatchAsyncOfT(T inputs , TableDescriptorBuilder.MessageTable tableDescriptor - , IJsonTypeResolver resolver + , ITypeResolver resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct) where T : class, IEnumerable { - var batch = new NpgsqlBatch(connection, transaction); + await using var batch = new NpgsqlBatch(connection, transaction); foreach (var input in inputs) { var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType()); diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publisher/OptionsBuilder.cs similarity index 59% rename from src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs rename to src/Blumchen/Publisher/OptionsBuilder.cs index 90e7792..e4d9582 100644 --- a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs +++ b/src/Blumchen/Publisher/OptionsBuilder.cs @@ -3,43 +3,47 @@ using JetBrains.Annotations; using static Blumchen.TableDescriptorBuilder; -namespace Blumchen.Publications; +namespace Blumchen.Publisher; -#pragma warning disable CS1591 -public class PublisherSetupOptionsBuilder +public class OptionsBuilder { - private INamingPolicy? _namingPolicy; - private JsonSerializerContext? _jsonSerializerContext; + [System.Diagnostics.CodeAnalysis.NotNull] + private INamingPolicy? _namingPolicy = default; + + [System.Diagnostics.CodeAnalysis.NotNull] + private JsonSerializerContext? _jsonSerializerContext = default; + private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); - private MessageTable? _tableDescriptor; + + private MessageTable? _tableDescriptor = default; [UsedImplicitly] - public PublisherSetupOptionsBuilder NamingPolicy(INamingPolicy namingPolicy) + public OptionsBuilder NamingPolicy(INamingPolicy namingPolicy) { _namingPolicy = namingPolicy; return this; } [UsedImplicitly] - public PublisherSetupOptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext) + public OptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext) { _jsonSerializerContext = jsonSerializerContext; return this; } [UsedImplicitly] - public PublisherSetupOptionsBuilder WithTable(Func builder) + public OptionsBuilder WithTable(Func builder) { _tableDescriptor = builder(TableDescriptorBuilder).Build(); return this; } - public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build() + public PublisherOptions Build() { - ArgumentNullException.ThrowIfNull(_jsonSerializerContext); - ArgumentNullException.ThrowIfNull(_namingPolicy); - _tableDescriptor ??= TableDescriptorBuilder.Build(); + Ensure.NotNull(_jsonSerializerContext, nameof(JsonContext)); + Ensure.NotNull(_namingPolicy, nameof(NamingPolicy)); + var jsonTypeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); using var typeEnum = _jsonSerializerContext.GetType() .GetCustomAttributesData() @@ -49,6 +53,6 @@ public PublisherSetupOptionsBuilder WithTable(Func JsonTypeResolver); + +public static class PublisherOptionsExtensions +{ + [UsedImplicitly] + public static async Task EnsureTable(this PublisherOptions publisherOptions, NpgsqlDataSource dataSource, CancellationToken ct) + { + await dataSource.EnsureTableExists(publisherOptions.TableDescriptor, ct); + return publisherOptions; + } + + [UsedImplicitly] + public static async Task EnsureTable(this PublisherOptions publisherOptions, + string connectionString, CancellationToken ct) + => await EnsureTable(publisherOptions, new NpgsqlDataSourceBuilder(connectionString).Build(), ct); +} diff --git a/src/Blumchen/Serialization/IDictionaryExtensions.cs b/src/Blumchen/Serialization/IDictionaryExtensions.cs deleted file mode 100644 index f97da92..0000000 --- a/src/Blumchen/Serialization/IDictionaryExtensions.cs +++ /dev/null @@ -1,9 +0,0 @@ -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member -namespace Blumchen.Serialization; - - -public static class DictionaryExtensions -{ - internal static IEnumerable Keys(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Keys ?? Enumerable.Empty(); - internal static IEnumerable Values(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Values ?? Enumerable.Empty(); -} diff --git a/src/Blumchen/Serialization/INamingPolicy.cs b/src/Blumchen/Serialization/INamingPolicy.cs index 68c96ef..f7daa22 100644 --- a/src/Blumchen/Serialization/INamingPolicy.cs +++ b/src/Blumchen/Serialization/INamingPolicy.cs @@ -1,5 +1,4 @@ namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface INamingPolicy { @@ -12,6 +11,6 @@ public abstract record NamingPolicy(Func Bind):INamingPolicy } //This should be used in shared kernel scenario where common library is shared between Pub and Sub -public record FQNNamingPolicy(): NamingPolicy(type => type.FullName!); +public sealed record FQNNamingPolicy(): NamingPolicy(type => type.FullName!); //This policy is better suited for distributed components -public record AttributeNamingPolicy(): NamingPolicy(MessageUrn.ForTypeString); +public sealed record AttributeNamingPolicy(): NamingPolicy(MessageUrn.ForTypeString); diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 049462b..e822c23 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -3,19 +3,17 @@ using System.Text.Json.Serialization.Metadata; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface ITypeResolver { (string, T) Resolve(Type type); + Type Resolve(string type); } -public interface IJsonTypeResolver: ITypeResolver; - internal sealed class JsonTypeResolver( JsonSerializerContext serializationContext, INamingPolicy? namingPolicy = default) - : IJsonTypeResolver + : ITypeResolver { public JsonSerializerContext SerializationContext { get; } = serializationContext; private readonly ConcurrentDictionary _typeDictionary = []; @@ -25,14 +23,14 @@ internal sealed class JsonTypeResolver( internal void WhiteList(Type type) { var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); - _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); - _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); + _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type); + _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo); } public (string, JsonTypeInfo) Resolve(Type type) => (_typeDictionary.Single(kv => kv.Value == type).Key, _typeInfoDictionary[type]); - internal IDictionary RegisteredTypes { get => _typeDictionary; } - internal Type Resolve(string type) => _typeDictionary[type]; + public IDictionary RegisteredTypes { get => _typeDictionary; } + public Type Resolve(string type) => _typeDictionary[type]; } diff --git a/src/Blumchen/Serialization/JsonSerialization.cs b/src/Blumchen/Serialization/JsonSerialization.cs index feb4095..6fdfad0 100644 --- a/src/Blumchen/Serialization/JsonSerialization.cs +++ b/src/Blumchen/Serialization/JsonSerialization.cs @@ -4,7 +4,6 @@ using Blumchen.Streams; namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class JsonSerialization { diff --git a/src/Blumchen/Serialization/MessageUrnAttribute.cs b/src/Blumchen/Serialization/MessageUrnAttribute.cs deleted file mode 100644 index 8963e34..0000000 --- a/src/Blumchen/Serialization/MessageUrnAttribute.cs +++ /dev/null @@ -1,60 +0,0 @@ -using System.Collections.Concurrent; - -namespace Blumchen.Serialization; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - -[AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)] -public class MessageUrnAttribute: - Attribute -{ - /// - /// - /// The urn value to use for this message type. - public MessageUrnAttribute(string urn) - { - ArgumentException.ThrowIfNullOrEmpty(urn, nameof(urn)); - - if (urn.StartsWith(MessageUrn.Prefix)) - throw new ArgumentException($"Value should not contain the default prefix '{MessageUrn.Prefix}'.", nameof(urn)); - - Urn = FormatUrn(urn); - } - - public Uri Urn { get; } - - private static Uri FormatUrn(string urn) - { - var fullValue = MessageUrn.Prefix + urn; - - if (Uri.TryCreate(fullValue, UriKind.Absolute, out var uri)) - return uri; - - throw new UriFormatException($"Invalid URN: {fullValue}"); - } -} - - -public static class MessageUrn -{ - public const string Prefix = "urn:message:"; - - private static readonly ConcurrentDictionary Cache = new(); - - - public static string ForTypeString(Type type) => - Cache.GetOrAdd(type,t => - { - var attribute = Attribute.GetCustomAttribute(t, typeof(MessageUrnAttribute)) as MessageUrnAttribute ?? - throw new NotSupportedException($"Attribute not defined fot type '{type}'"); - return new Cached(attribute.Urn, attribute.Urn.ToString()); - }).UrnString; - - - private interface ICached - { - Uri Urn { get; } - string UrnString { get; } - } - - private record Cached(Uri Urn, string UrnString): ICached; -} diff --git a/src/Blumchen/Serialization/RoutingByAttributes.cs b/src/Blumchen/Serialization/RoutingByAttributes.cs new file mode 100644 index 0000000..36f69ef --- /dev/null +++ b/src/Blumchen/Serialization/RoutingByAttributes.cs @@ -0,0 +1,77 @@ +using System.Collections.Concurrent; + +namespace Blumchen.Serialization; + +public interface IRouted +{ + string Route { get; } +} + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)] +public class MessageRoutedByUrnAttribute(string route): + Attribute, IRouted +{ + public string Route { get; } = Format(route); + + private static string Format(string urn) + { + ArgumentException.ThrowIfNullOrEmpty(urn, nameof(urn)); + + if (urn.StartsWith(MessageUrn.Prefix)) + throw new ArgumentException($"Value should not contain the default prefix '{MessageUrn.Prefix}'.", nameof(urn)); + + return FormatUrn(urn).AbsoluteUri; + } + + private static Uri FormatUrn(string urn) + { + var fullValue = MessageUrn.Prefix + urn; + + if (Uri.TryCreate(fullValue, UriKind.Absolute, out var uri)) + return uri; + + throw new UriFormatException($"Invalid URN: {fullValue}"); + } +} + +internal static class MessageUrn +{ + public const string Prefix = "urn:message:"; + + private static readonly ConcurrentDictionary Cache = new(); + + public static string ForTypeString(Type type) => + Cache.GetOrAdd(type, t => + { + var attribute = Attribute.GetCustomAttribute(t, typeof(MessageRoutedByUrnAttribute)) as MessageRoutedByUrnAttribute ?? + throw new NotSupportedException($"Attribute not defined fot type '{type}'"); + return new Cached(attribute.Route); + }).UrnString; + + + private interface ICached + { + string UrnString { get; } + } + + private record Cached(string UrnString): ICached; +} + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)] +public class RawRoutedByUrnAttribute(string route): MessageRoutedByUrnAttribute(route); + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)] +public class RawRoutedByStringAttribute(string name): Attribute, IRouted +{ + private static string Format(string name) + { + if(string.IsNullOrWhiteSpace(name)) + throw new FormatException($"Invalid {nameof(name)}: {name}."); + + return name; + } + public string Route { get; } = Format(name); +} + + + diff --git a/src/Blumchen/Subscriber/IConsumes.cs b/src/Blumchen/Subscriber/IConsumes.cs new file mode 100644 index 0000000..41b8581 --- /dev/null +++ b/src/Blumchen/Subscriber/IConsumes.cs @@ -0,0 +1,15 @@ +using Blumchen.Subscriptions.Replication; +using static Blumchen.Subscriber.OptionsBuilder; + +namespace Blumchen.Subscriber; + +public interface IConsumes +{ + OptionsBuilder ConsumesRawStrings(IMessageHandler handler); + OptionsBuilder ConsumesRawObjects(IMessageHandler handler); + OptionsBuilder ConsumesRawString(IMessageHandler handler) where T : class; + OptionsBuilder ConsumesRawObject(IMessageHandler handler) where T : class; + + OptionsBuilder Consumes(IMessageHandler handler, Func opts) + where T : class; +} diff --git a/src/Blumchen/Subscriber/ISubscriberOptions.cs b/src/Blumchen/Subscriber/ISubscriberOptions.cs new file mode 100644 index 0000000..7ebc706 --- /dev/null +++ b/src/Blumchen/Subscriber/ISubscriberOptions.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Replication; +using JetBrains.Annotations; +using Npgsql; +using static Blumchen.Subscriptions.Management.PublicationManagement; +using static Blumchen.Subscriptions.Management.ReplicationSlotManagement; + +namespace Blumchen.Subscriber; + +public interface ISubscriberOptions +{ + [UsedImplicitly] NpgsqlDataSource DataSource { get; } + [UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; } + IDictionary> Registry { get; } + [UsedImplicitly] PublicationOptions PublicationOptions { get; } + [UsedImplicitly] ReplicationSlotOptions ReplicationOptions { get; } + [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } + + void Deconstruct( + out NpgsqlDataSource dataSource, + out NpgsqlConnectionStringBuilder connectionStringBuilder, + out PublicationOptions publicationOptions, + out ReplicationSlotOptions replicationSlotOptions, + out IErrorProcessor errorProcessor, + out IDictionary> registry); +} + +internal record SubscriberOptions( + NpgsqlDataSource DataSource, + NpgsqlConnectionStringBuilder ConnectionStringBuilder, + PublicationOptions PublicationOptions, + ReplicationSlotOptions ReplicationOptions, + IErrorProcessor ErrorProcessor, + IDictionary> Registry): ISubscriberOptions; diff --git a/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs b/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs new file mode 100644 index 0000000..2e64319 --- /dev/null +++ b/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs @@ -0,0 +1,74 @@ +using System.Reflection; +using System.Text.Json.Serialization; +using Blumchen.Serialization; +using Blumchen.Subscriptions.Replication; +using ImTools; +using JetBrains.Annotations; + +namespace Blumchen.Subscriber; + +public sealed partial class OptionsBuilder +{ + private INamingPolicy? _namingPolicy; + private JsonSerializerContext? _jsonSerializerContext; + + [UsedImplicitly] + internal OptionsBuilder NamingPolicy(INamingPolicy namingPolicy) + { + Ensure.Null(_namingPolicy, nameof(NamingPolicy)); + _namingPolicy = namingPolicy; + return this; + } + + public interface INamingOptionsContext + { + OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy); + } + + internal class NamingOptionsContext(OptionsBuilder builder): INamingOptionsContext + { + public OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy) + => builder.NamingPolicy(namingPolicy); + } + + public interface IConsumesTypedJsonOptionsContext + { + INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext); + IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class; + } + + internal class ConsumesTypedJsonTypedJsonOptionsContext(OptionsBuilder builder): IConsumesTypedJsonOptionsContext + { + public INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext) + { + builder._jsonSerializerContext = jsonSerializerContext; + return new NamingOptionsContext(builder); + } + + public IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class + { + return builder.Consumes(handler); + } + } + + internal IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class + { + Ensure.Empty(_replicationDataMapperSelector, nameof(Consumes)); + var methodInfo = handler + .GetType() + .GetMethod(nameof(IMessageHandler.Handle), BindingFlags.Instance | BindingFlags.Public, [typeof(T)]) + ?? throw new ConfigurationException($"Unable to find {nameof(IMessageHandler)} implementation on {handler.GetType().Name}"); + + if (_typeRegistry.GetEntryOrNull(typeof(T).GetHashCode()) != null) + throw new ConfigurationException($"`{typeof(T).Name}` was already registered."); + _typeRegistry = _typeRegistry.AddSureNotPresentEntry( + new KVEntry>(typeof(T).GetHashCode(), typeof(T), + new Tuple(handler, methodInfo)) + ); + + return new ConsumesTypedJsonTypedJsonOptionsContext(this); + } + + public OptionsBuilder Consumes(IMessageHandler handler, Func opts) where T : class + => opts(Consumes(handler)); +} diff --git a/src/Blumchen/Subscriber/OptionsBuilder.cs b/src/Blumchen/Subscriber/OptionsBuilder.cs new file mode 100644 index 0000000..2c47388 --- /dev/null +++ b/src/Blumchen/Subscriber/OptionsBuilder.cs @@ -0,0 +1,203 @@ +using System.Collections; +using System.Reflection; +using System.Security.Principal; +using System.Text.Json.Serialization; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Management; +using Blumchen.Subscriptions.Replication; +using ImTools; +using JetBrains.Annotations; +using Npgsql; +// ReSharper disable RedundantDefaultMemberInitializer + +namespace Blumchen.Subscriber; + +public sealed partial class OptionsBuilder + : IConsumes +{ + internal const string WildCard = "*"; + + [System.Diagnostics.CodeAnalysis.NotNull] + private NpgsqlConnectionStringBuilder? _connectionStringBuilder = default; + + [System.Diagnostics.CodeAnalysis.NotNull] + private NpgsqlDataSource? _dataSource = default; + + private PublicationManagement.PublicationOptions _publicationOptions = new(); + private ReplicationSlotManagement.ReplicationSlotOptions? _replicationSlotOptions; + private ImHashMap> _typeRegistry = ImHashMap>.Empty; + + private readonly Dictionary> + _replicationDataMapperSelector = []; + + private IErrorProcessor? _errorProcessor; + private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); + private TableDescriptorBuilder.MessageTable? _tableDescriptor; + + private IReplicationJsonBMapper? _jsonDataMapper; + + + [UsedImplicitly] + public OptionsBuilder WithTable( + Func builder) + { + _tableDescriptor = builder(TableDescriptorBuilder).Build(); + return this; + } + + [UsedImplicitly] + public OptionsBuilder ConnectionString(string connectionString) + { + Ensure.Null(_connectionStringBuilder, nameof(ConnectionString)); + _connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString); + return this; + } + + [UsedImplicitly] + public OptionsBuilder DataSource(NpgsqlDataSource dataSource) + { + Ensure.Null(_dataSource, nameof(DataSource)); + _dataSource = dataSource; + return this; + } + + [UsedImplicitly] + public OptionsBuilder WithPublicationOptions(PublicationManagement.PublicationOptions publicationOptions) + { + + _publicationOptions = + publicationOptions with { RegisteredTypes = _publicationOptions.RegisteredTypes }; + return this; + } + + [UsedImplicitly] + public OptionsBuilder WithReplicationOptions( + ReplicationSlotManagement.ReplicationSlotOptions replicationSlotOptions) + { + _replicationSlotOptions = replicationSlotOptions; + return this; + } + + [UsedImplicitly] + public OptionsBuilder ConsumesRawObject(IMessageHandler handler) where T : class + => ConsumesRaw(handler, ObjectReplicationDataMapper.Instance); + + [UsedImplicitly] + public OptionsBuilder ConsumesRawString(IMessageHandler handler) where T : class + => ConsumesRaw(handler, StringReplicationDataMapper.Instance); + + [UsedImplicitly] + public OptionsBuilder ConsumesRawStrings(IMessageHandler handler) + { + Ensure.And( + new EmptyTrait(), _replicationDataMapperSelector, + new BoolTrait>>(r => r.IsEmpty, Ensure.CannotBeMixedWithOtherConsumingStrategies), + _typeRegistry,nameof(ConsumesRawStrings) + ); + + var methodInfo = handler + .GetType() + .GetMethod(nameof(IMessageHandler.Handle),BindingFlags.Instance | BindingFlags.Public, [typeof(string)]) + ?? throw new ConfigurationException($"Unable to find {nameof(IMessageHandler)} implementation on {handler.GetType().Name}"); + + _replicationDataMapperSelector.Add(WildCard, + new Tuple(StringReplicationDataMapper.Instance, handler, methodInfo)); + return this; + } + + [UsedImplicitly] + public OptionsBuilder ConsumesRawObjects(IMessageHandler handler) + { + Ensure.And( + new EmptyTrait(), _replicationDataMapperSelector, + new BoolTrait>>(r => r.IsEmpty, Ensure.CannotBeMixedWithOtherConsumingStrategies), + _typeRegistry, nameof(ConsumesRawObjects) + ); + + var methodInfo = handler + .GetType() + .GetMethod(nameof(IMessageHandler.Handle), BindingFlags.Instance | BindingFlags.Public, [typeof(object)]) + ?? throw new ConfigurationException($"Unable to find {nameof(IMessageHandler)} implementation on {handler.GetType().Name}"); + + + _replicationDataMapperSelector.Add(WildCard, + new Tuple(ObjectReplicationDataMapper.Instance, handler, methodInfo)); + return this; + } + + private OptionsBuilder ConsumesRaw(IMessageHandler handler, + IReplicationJsonBMapper dataMapper) where T : class where TU : class + { + var routingList = typeof(T) + .GetAttributes() + .Union( + typeof(T).GetAttributes() + ) + .Select(routed => routed.Route).ToList(); + Ensure.RawUrn>(routingList, typeof(T).Name); + + var methodInfo = handler + .GetType() + .GetMethod(nameof(IMessageHandler.Handle), BindingFlags.Instance | BindingFlags.Public, [typeof(TU)]) + ?? throw new ConfigurationException($"Unable to find {nameof(IMessageHandler)} implementation on {handler.GetType().Name}"); + + using var urnEnum = routingList.GetEnumerator(); + while (urnEnum.MoveNext()) + _replicationDataMapperSelector.Add(urnEnum.Current, + new Tuple(dataMapper, handler, methodInfo)); + return this; + } + + [UsedImplicitly] + public OptionsBuilder WithErrorProcessor(IErrorProcessor? errorProcessor) + { + _errorProcessor = errorProcessor; + return this; + } + + internal ISubscriberOptions Build() + { + _tableDescriptor ??= TableDescriptorBuilder.Build(); + Ensure.NotNull(_connectionStringBuilder, $"{nameof(ConnectionString)}"); + Ensure.NotNull(_dataSource, $"{nameof(DataSource)}"); + + if (!_typeRegistry.IsEmpty) + { + Ensure.NotNull(_namingPolicy, $"{nameof(NamingPolicy)}"); + if (_jsonSerializerContext != null) + { + var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); + foreach (var kv in _typeRegistry.Enumerate()) + typeResolver.WhiteList(kv.Key); + + _jsonDataMapper = + new JsonReplicationDataMapper(typeResolver, new JsonReplicationDataReader(typeResolver)); + + foreach (var (key, value) in typeResolver.RegisteredTypes.Join(_typeRegistry.Enumerate(), pair => pair.Value, + pair => pair.Key, (pair, valuePair) => (pair.Key, valuePair.Value))) + _replicationDataMapperSelector.Add(key, + new Tuple(_jsonDataMapper, value.Item1, value.Item2)); + } + else + { + throw new ConfigurationException($"`${nameof(Consumes)}<>` requires a valid `{nameof(JsonSerializerContext)}`."); + } + } + Ensure.NotEmpty(_replicationDataMapperSelector, $"{nameof(Consumes)}..."); + _publicationOptions = _publicationOptions + with + { + RegisteredTypes = _replicationDataMapperSelector.Keys.Except([WildCard]).ToHashSet(), + TableDescriptor = _tableDescriptor + }; + return new SubscriberOptions( + _dataSource, + _connectionStringBuilder, + _publicationOptions, + _replicationSlotOptions ?? new ReplicationSlotManagement.ReplicationSlotOptions(), + _errorProcessor ?? new ConsoleOutErrorProcessor(), + _replicationDataMapperSelector + ); + } +} diff --git a/src/Blumchen/Subscriber/TypeExtensions.cs b/src/Blumchen/Subscriber/TypeExtensions.cs new file mode 100644 index 0000000..2dcd3e8 --- /dev/null +++ b/src/Blumchen/Subscriber/TypeExtensions.cs @@ -0,0 +1,10 @@ +using Blumchen.Serialization; + +namespace Blumchen.Subscriber; + +internal static class TypeExtensions +{ + public static IEnumerable GetAttributes(this Type type) + where T : Attribute, IRouted => + type.GetCustomAttributes(typeof(T), false).OfType(); +} diff --git a/src/Blumchen/Subscriptions/IConsume.cs b/src/Blumchen/Subscriptions/IConsume.cs deleted file mode 100644 index 4a59c0f..0000000 --- a/src/Blumchen/Subscriptions/IConsume.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - -public interface IConsume; - -public interface IConsumes: IConsume where T : class -{ - Task Handle(T value); -} diff --git a/src/Blumchen/Subscriptions/IErrorProcessor.cs b/src/Blumchen/Subscriptions/IErrorProcessor.cs new file mode 100644 index 0000000..889d97d --- /dev/null +++ b/src/Blumchen/Subscriptions/IErrorProcessor.cs @@ -0,0 +1,23 @@ +using Microsoft.Extensions.Logging; + +namespace Blumchen.Subscriptions; + +public interface IErrorProcessor +{ + Func Process { get; } +} + +public record ConsoleOutErrorProcessor: IErrorProcessor +{ + public Func Process => (exception, id) => Console.Out.WriteLineAsync($"record id:{id} resulted in error:{exception.Message}"); +} + +public record LoggingErrorProcessor(ILogger Logger): IErrorProcessor +{ + public Func Process => (exception, id) + => + { + Logger.LogError("record id:{id} resulted in error:{exception.Message}", id, exception.Message); + return Task.CompletedTask; + }; +} diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs deleted file mode 100644 index 1be177c..0000000 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Blumchen.Subscriptions.Replication; -using JetBrains.Annotations; -using static Blumchen.Subscriptions.Management.PublicationManagement; -using static Blumchen.Subscriptions.Management.ReplicationSlotManagement; - -namespace Blumchen.Subscriptions; - -internal interface ISubscriptionOptions -{ - [UsedImplicitly] string ConnectionString { get; } - IReplicationDataMapper DataMapper { get; } - [UsedImplicitly] PublicationSetupOptions PublicationOptions { get; } - [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } - [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } - - void Deconstruct( - out string connectionString, - out PublicationSetupOptions publicationSetupOptions, - out ReplicationSlotSetupOptions replicationSlotSetupOptions, - out IErrorProcessor errorProcessor, - out IReplicationDataMapper dataMapper, - out Dictionary registry); -} - -internal record SubscriptionOptions( - string ConnectionString, - PublicationSetupOptions PublicationOptions, - ReplicationSlotSetupOptions ReplicationOptions, - IErrorProcessor ErrorProcessor, - IReplicationDataMapper DataMapper, - Dictionary Registry): ISubscriptionOptions; diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 64e6033..2f667f3 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -1,9 +1,7 @@ using Blumchen.Database; -using Blumchen.Serialization; using Npgsql; #pragma warning disable CA2208 -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace Blumchen.Subscriptions.Management; @@ -13,41 +11,39 @@ public static class PublicationManagement { public static async Task SetupPublication( this NpgsqlDataSource dataSource, - PublicationSetupOptions setupOptions, + PublicationOptions options, CancellationToken ct ) { - var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver, tableDescription) = setupOptions; + var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, registeredTypes, tableDescription) = options; return createStyle switch { Subscription.CreateStyle.Never => new None(), - Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false), + Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, registeredTypes, ct).ConfigureAwait(false), Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableDescription.Name, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), - Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false), - _ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle)) + Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, registeredTypes, ct).ConfigureAwait(false), + _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) }; static async Task ReCreate( NpgsqlDataSource dataSource, string publicationName, string tableName, - JsonTypeResolver? typeResolver, + ISet registeredTypes, CancellationToken ct ) { await dataSource.DropPublication(publicationName, ct).ConfigureAwait(false); - return await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false); + return await Create(dataSource, publicationName, tableName, registeredTypes, ct).ConfigureAwait(false); } static async Task Create(NpgsqlDataSource dataSource, string publicationName, string tableName, - JsonTypeResolver? typeResolver, + ISet registeredTypes, CancellationToken ct ) { - await dataSource.CreatePublication(publicationName, tableName, - typeResolver.Keys().ToHashSet(), ct).ConfigureAwait(false); - + await dataSource.CreatePublication(publicationName, tableName, registeredTypes, ct).ConfigureAwait(false); return new Created(); } @@ -63,25 +59,30 @@ CancellationToken ct } } - internal static Task CreatePublication( - this NpgsqlDataSource dataSource, + internal static string CreatePublication( string publicationName, string tableName, - ISet eventTypes, - CancellationToken ct - ) { - return eventTypes.Count switch + ISet registeredTypes + ) + { + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} {{0}} WITH (publish = 'insert');"; + return registeredTypes.Count switch { - 0 => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", - ct - ), - _ => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WHERE ({PublicationFilter(eventTypes)}) WITH (publish = 'insert');", - ct - ) + 0 => string.Format(sql, string.Empty), + _ => string.Format(sql, $"WHERE ({PublicationFilter(registeredTypes)})") }; static string PublicationFilter(ICollection input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'")); } + internal static Task CreatePublication( + this NpgsqlDataSource dataSource, + string publicationName, + string tableName, + ISet registeredTypes, + CancellationToken ct + ) => Execute(dataSource, CreatePublication(publicationName, tableName, registeredTypes), ct); + + private static async Task Execute( this NpgsqlDataSource dataSource, string sql, @@ -129,8 +130,7 @@ private static Task PublicationExists( this NpgsqlDataSource dataSource, string publicationName, CancellationToken ct - ) => - dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); + ) => dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); public abstract record SetupPublicationResult { @@ -141,14 +141,14 @@ public record AlreadyExists: SetupPublicationResult; public record Created: SetupPublicationResult; } - public sealed record PublicationSetupOptions( - string PublicationName = PublicationSetupOptions.DefaultPublicationName, + public sealed record PublicationOptions( + string PublicationName = PublicationOptions.DefaultName, Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, bool ShouldReAddTablesIfWereRecreated = false ) { - internal const string DefaultPublicationName = "pub"; - internal JsonTypeResolver? TypeResolver { get; init; } = default; + internal const string DefaultName = "pub"; + internal ISet RegisteredTypes { get; init; } = Enumerable.Empty().ToHashSet(); internal TableDescriptorBuilder.MessageTable TableDescriptor { get; init; } = new TableDescriptorBuilder().Build(); @@ -156,13 +156,13 @@ internal void Deconstruct( out string publicationName, out Subscription.CreateStyle createStyle, out bool reAddTablesIfWereRecreated, - out JsonTypeResolver? typeResolver, + out ISet registeredTypes, out TableDescriptorBuilder.MessageTable tableDescription) { publicationName = PublicationName; createStyle = Subscription.CreateStyle.WhenNotExists; reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated; - typeResolver = TypeResolver; + registeredTypes = RegisteredTypes; tableDescription = TableDescriptor; } diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 4595dd4..0ce5ee1 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -5,15 +5,20 @@ namespace Blumchen.Subscriptions.Management; using static ReplicationSlotManagement.CreateReplicationSlotResult; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class ReplicationSlotManagement { #pragma warning disable CA2208 + private static Task ReplicationSlotExists( + this NpgsqlDataSource dataSource, + string slotName, + CancellationToken ct + ) => dataSource.Exists("pg_replication_slots", "slot_name ILIKE $1", [slotName], ct); + public static async Task SetupReplicationSlot( this NpgsqlDataSource dataSource, LogicalReplicationConnection connection, - ReplicationSlotSetupOptions options, + ReplicationSlotOptions options, CancellationToken ct ) { @@ -23,49 +28,21 @@ CancellationToken ct { (Subscription.CreateStyle.Never,_) => new None(), (Subscription.CreateStyle.WhenNotExists,true) => new AlreadyExists(), - (Subscription.CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false), - (Subscription.CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false), - (Subscription.CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.WhenNotExists,false) => await connection.Create(slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate,true) => await connection.ReCreate(slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate, false) => await connection.Create(slotName, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) }; - static async Task ReCreate( - LogicalReplicationConnection connection, - string slotName, - CancellationToken ct) - { - await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false); - return await Create(connection, slotName, ct).ConfigureAwait(false); - } - - static async Task Create( - LogicalReplicationConnection connection, - string slotName, - CancellationToken ct) - { - var result = await connection.CreatePgOutputReplicationSlot( - slotName, - slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, - cancellationToken: ct - ).ConfigureAwait(false); - - return new Created(result.SnapshotName!, result.ConsistentPoint); - } } - - private static Task ReplicationSlotExists( - this NpgsqlDataSource dataSource, - string slotName, - CancellationToken ct - ) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct); - - public record ReplicationSlotSetupOptions( + + public record ReplicationSlotOptions( string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, - bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY + bool Binary = + false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY ); - public abstract record CreateReplicationSlotResult { public record None: CreateReplicationSlotResult; @@ -75,3 +52,29 @@ public record AlreadyExists: CreateReplicationSlotResult; public record Created(string SnapshotName, NpgsqlLogSequenceNumber LogSequenceNumber): CreateReplicationSlotResult; } } + +internal static class LogicalReplicationConnectionExtensions +{ + internal static async Task Create( + this LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + var result = await connection.CreatePgOutputReplicationSlot( + slotName, + slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, + cancellationToken: ct + ).ConfigureAwait(false); + + return new Created(result.SnapshotName!, result.ConsistentPoint); + } + + internal static async Task ReCreate( + this LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false); + return await connection.Create(slotName, ct).ConfigureAwait(false); + } +} diff --git a/src/Blumchen/Subscriptions/MimeType.cs b/src/Blumchen/Subscriptions/MimeType.cs index 8bb3ef9..4874cdd 100644 --- a/src/Blumchen/Subscriptions/MimeType.cs +++ b/src/Blumchen/Subscriptions/MimeType.cs @@ -1,7 +1,9 @@ namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 public abstract record MimeType(string mimeType) { - public record Json(): MimeType("application/json"); + internal record JsonMimeType(): MimeType("application/json"); + + public static MimeType Json => new JsonMimeType(); + } diff --git a/src/Blumchen/Subscriptions/Replication/Envelope.cs b/src/Blumchen/Subscriptions/Replication/Envelope.cs new file mode 100644 index 0000000..d8e4eae --- /dev/null +++ b/src/Blumchen/Subscriptions/Replication/Envelope.cs @@ -0,0 +1,7 @@ +namespace Blumchen.Subscriptions.Replication; + +public interface IEnvelope; + +public sealed record OkEnvelope(object Value, string MessageType): IEnvelope; + +public sealed record KoEnvelope(Exception Error, string Id): IEnvelope; diff --git a/src/Blumchen/Subscriptions/Replication/IMessageHandler.cs b/src/Blumchen/Subscriptions/Replication/IMessageHandler.cs new file mode 100644 index 0000000..12b4d64 --- /dev/null +++ b/src/Blumchen/Subscriptions/Replication/IMessageHandler.cs @@ -0,0 +1,8 @@ +namespace Blumchen.Subscriptions.Replication; + +public interface IMessageHandler; + +public interface IMessageHandler: IMessageHandler where T : class +{ + Task Handle(T value); +} diff --git a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs index 90b75d2..570c08a 100644 --- a/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs +++ b/src/Blumchen/Subscriptions/Replication/IReplicationDataMapper.cs @@ -1,9 +1,12 @@ -using Blumchen.Subscriptions.ReplicationMessageHandlers; +using System.Reflection; using Npgsql; +using Npgsql.Replication.PgOutput; using Npgsql.Replication.PgOutput.Messages; +using System.Text.Json; +using Blumchen.Subscriber; +using ImTools; namespace Blumchen.Subscriptions.Replication; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public interface IReplicationDataMapper { @@ -11,3 +14,84 @@ public interface IReplicationDataMapper Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct); } + +internal class ReplicationDataMapper(IDictionary> mapperSelector) + : IReplicationDataMapper +{ + + private ImHashMap _memo = ImHashMap.Empty; + + private static IReplicationJsonBMapper SelectMapper(string key, IDictionary> registry) => + registry.FindByMultiKey(key, OptionsBuilder.WildCard)?.Item1 + ?? throw new NotSupportedException($"Unexpected message `{key}`"); + + public async Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct) + { + var id = string.Empty; + var columnNumber = 0; + var typeName = string.Empty; + await foreach (var column in insertMessage.NewRow.ConfigureAwait(false)) + { + try + { + switch (columnNumber) + { + case 0: + id = column.Kind == TupleDataKind.BinaryValue + ? (await column.Get(ct).ConfigureAwait(false)).ToString() + : await column.Get(ct).ConfigureAwait(false); + break; + case 1: + using (var textReader = column.GetTextReader()) + { + typeName = await textReader.ReadToEndAsync(ct).ConfigureAwait(false); + break; + } + case 2 when column.GetDataTypeName().Equals("jsonb", StringComparison.OrdinalIgnoreCase): + + + _memo = _memo.AddOrGetEntry(typeName.GetHashCode(), + new KVEntry(typeName.GetHashCode(), typeName, + SelectMapper(typeName, mapperSelector))); + return await _memo.GetValueOrDefault(typeName.GetHashCode(), typeName) + .ReadFromReplication(id, typeName, column, ct); + + } + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException + or JsonException) + { + return new KoEnvelope(ex, id); + } + + columnNumber++; + } + + throw new InvalidOperationException("You should not get here"); + } + + private IReplicationJsonBMapper Get(string typeName) + { + if (_memo.TryFind(typeName, out var mapper)) return mapper; + mapper = SelectMapper(typeName, mapperSelector); + _memo = _memo.AddOrUpdate(typeName, mapper); + return mapper; + } + + public async Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) + { + long id = default; + try + { + id = reader.GetInt64(0); + var typeName = reader.GetString(1); + var mapper = Get(typeName); + return await mapper.ReadFromSnapshot(typeName, id, reader, ct); + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException + or JsonException) + { + return new KoEnvelope(ex, id.ToString()); + } + } +} diff --git a/src/Blumchen/Subscriptions/Replication/IReplicationDataReader.cs b/src/Blumchen/Subscriptions/Replication/IReplicationDataReader.cs new file mode 100644 index 0000000..3e86ccf --- /dev/null +++ b/src/Blumchen/Subscriptions/Replication/IReplicationDataReader.cs @@ -0,0 +1,56 @@ +using Blumchen.Serialization; +using Npgsql; +using Npgsql.Replication.PgOutput; + +namespace Blumchen.Subscriptions.Replication; + +internal interface IReplicationDataReader +{ + Task Read(ReplicationValue replicationValue, CancellationToken ct, Type? type = default); + Task Read(NpgsqlDataReader reader, CancellationToken ct, Type? type = default); +} +internal class ObjectReplicationDataReader: IReplicationDataReader +{ + public Task Read(ReplicationValue replicationValue, CancellationToken ct, Type? type = default) + => replicationValue.Get(ct).AsTask(); + + + public Task Read(NpgsqlDataReader reader, CancellationToken ct, Type? type = default) + => reader.GetFieldValueAsync(2, ct); +} + +internal class StringReplicationDataReader: IReplicationDataReader +{ + public async Task Read(ReplicationValue replicationValue, CancellationToken ct, Type? type = default) + { + using var tr = replicationValue.GetTextReader(); + return await tr.ReadToEndAsync(ct).ConfigureAwait(false); + } + + + public async Task Read(NpgsqlDataReader reader, CancellationToken ct, Type? type = default) + { + using var tr = await reader.GetTextReaderAsync(2, ct).ConfigureAwait(false); + return await tr.ReadToEndAsync(ct).ConfigureAwait(false); + } +} + +internal class JsonReplicationDataReader(JsonTypeResolver resolver): IReplicationDataReader +{ + public async Task Read(ReplicationValue replicationValue, CancellationToken ct, Type? type = default) + { + ArgumentNullException.ThrowIfNull(type); + await using var stream = replicationValue.GetStream(); + return await JsonSerialization.FromJsonAsync(type, stream, resolver.SerializationContext, ct) + .ConfigureAwait(false); + } + + public async Task Read(NpgsqlDataReader reader, CancellationToken ct, Type? type = default) + { + ArgumentNullException.ThrowIfNull(type); + var stream = await reader.GetStreamAsync(2, ct).ConfigureAwait(false); + await using var stream1 = stream.ConfigureAwait(false); + return await JsonSerialization.FromJsonAsync(type, stream, resolver.SerializationContext, ct) + .ConfigureAwait(false); + } +} diff --git a/src/Blumchen/Subscriptions/Replication/IReplicationJsonBMapper.cs b/src/Blumchen/Subscriptions/Replication/IReplicationJsonBMapper.cs new file mode 100644 index 0000000..dd267c4 --- /dev/null +++ b/src/Blumchen/Subscriptions/Replication/IReplicationJsonBMapper.cs @@ -0,0 +1,73 @@ +using Blumchen.Serialization; +using Npgsql; +using Npgsql.Replication.PgOutput; +using System.Text.Json.Serialization.Metadata; +using System.Text.Json; + +namespace Blumchen.Subscriptions.Replication; + +public interface IReplicationJsonBMapper +{ + Task ReadFromReplication(string id, string typeName, ReplicationValue column, + CancellationToken ct); + + Task ReadFromSnapshot(string typeName, long id, NpgsqlDataReader reader, CancellationToken ct); +} +internal class ReplicationDataMapper( + IReplicationDataReader replicationDataReader + , ITypeResolver? resolver = default + ): IReplicationJsonBMapper +{ + public async Task ReadFromReplication(string id, string typeName, ReplicationValue column, + CancellationToken ct) + { + + try + { + var type = resolver?.Resolve(typeName); + var value = await replicationDataReader.Read(column, ct, type).ConfigureAwait(false) ?? + throw new ArgumentNullException(); + return new OkEnvelope(value, typeName); + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException + or JsonException) + { + return new KoEnvelope(ex, id); + } + } + + public async Task ReadFromSnapshot(string typeName, long id, NpgsqlDataReader reader, CancellationToken ct) + { + try + { + var eventType = resolver?.Resolve(typeName); + var value = await replicationDataReader.Read(reader, ct, eventType).ConfigureAwait(false) ?? throw new ArgumentNullException(); + return new OkEnvelope(value, typeName); + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) + { + return new KoEnvelope(ex, id.ToString()); + } + } +} + +internal sealed class ObjectReplicationDataMapper( + IReplicationDataReader replicationDataReader +): ReplicationDataMapper(replicationDataReader) +{ + private static readonly Lazy Lazy = new(() => new(new ObjectReplicationDataReader())); + public static ObjectReplicationDataMapper Instance => Lazy.Value; +} + +internal sealed class StringReplicationDataMapper( + IReplicationDataReader replicationDataReader +): ReplicationDataMapper(replicationDataReader) +{ + private static readonly Lazy Lazy = new(() => new(new StringReplicationDataReader())); + public static StringReplicationDataMapper Instance => Lazy.Value; +} + +internal sealed class JsonReplicationDataMapper( + ITypeResolver resolver, + IReplicationDataReader replicationDataReader +): ReplicationDataMapper(replicationDataReader, resolver); diff --git a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs deleted file mode 100644 index 75be676..0000000 --- a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs +++ /dev/null @@ -1,69 +0,0 @@ -using System.Text.Json; -using Blumchen.Serialization; -using Blumchen.Subscriptions.ReplicationMessageHandlers; -using Npgsql; -using Npgsql.Replication.PgOutput; -using Npgsql.Replication.PgOutput.Messages; - -namespace Blumchen.Subscriptions.Replication; - -internal sealed class ReplicationDataMapper(JsonTypeResolver resolver): IReplicationDataMapper -{ - public async Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct) - { - var id = string.Empty; - var columnNumber = 0; - var typeName = string.Empty; - await foreach (var column in insertMessage.NewRow.ConfigureAwait(false)) - { - try - { - switch (columnNumber) - { - case 0: - id = column.Kind == TupleDataKind.BinaryValue - ? (await column.Get(ct).ConfigureAwait(false)).ToString() - : await column.Get(ct).ConfigureAwait(false); - break; - case 1: - using (var textReader = column.GetTextReader()) - { - typeName = await textReader.ReadToEndAsync(ct).ConfigureAwait(false); - break; - } - case 2 when column.GetDataTypeName().Equals("jsonb", StringComparison.OrdinalIgnoreCase): - { - var type = resolver.Resolve(typeName); - ArgumentNullException.ThrowIfNull(type, typeName); - return new OkEnvelope(await JsonSerialization.FromJsonAsync(type, column.GetStream(), resolver.SerializationContext, ct).ConfigureAwait(false)); - } - } - } - catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) - { - return new KoEnvelope(ex,id); - } - columnNumber++; - } - throw new InvalidOperationException("You should not get here"); - } - - public async Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) - { - long id = default; - try - { - id = reader.GetInt64(0); - var eventTypeName = reader.GetString(1); - var eventType = resolver.Resolve(eventTypeName); - ArgumentNullException.ThrowIfNull(eventType, eventTypeName); - var stream = await reader.GetStreamAsync(2, ct).ConfigureAwait(false); - await using var stream1 = stream.ConfigureAwait(false); - return new OkEnvelope(await JsonSerialization.FromJsonAsync(eventType, stream, resolver.SerializationContext, ct).ConfigureAwait(false)); - } - catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) - { - return new KoEnvelope(ex, id.ToString()); - } - } -} diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/Replication/SnapshotReader.cs similarity index 77% rename from src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs rename to src/Blumchen/Subscriptions/Replication/SnapshotReader.cs index 5603e79..368a95a 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/Replication/SnapshotReader.cs @@ -1,11 +1,8 @@ using System.Runtime.CompilerServices; using Blumchen.Database; -using Blumchen.Subscriptions.Replication; -using Blumchen.Subscriptions.ReplicationMessageHandlers; using Npgsql; -namespace Blumchen.Subscriptions.SnapshotReader; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member +namespace Blumchen.Subscriptions.Replication; public static class SnapshotReader { diff --git a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs b/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs deleted file mode 100644 index 099a627..0000000 --- a/src/Blumchen/Subscriptions/ReplicationMessageHandlers/Envelope.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Blumchen.Subscriptions.ReplicationMessageHandlers; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - -public interface IEnvelope; - -public sealed record OkEnvelope(object Value): IEnvelope; - -public sealed record KoEnvelope(Exception Error, string Id): IEnvelope; diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 95854af..8d60a5a 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -1,55 +1,58 @@ +using System.Dynamic; using System.Reflection; using System.Runtime.CompilerServices; using Blumchen.Database; -using Blumchen.Serialization; +using Blumchen.Subscriber; using Blumchen.Subscriptions.Management; -using Blumchen.Subscriptions.ReplicationMessageHandlers; -using Blumchen.Subscriptions.SnapshotReader; -using Microsoft.Extensions.Logging; +using Blumchen.Subscriptions.Replication; +using ImTools; using Npgsql; using Npgsql.Replication; using Npgsql.Replication.PgOutput; using Npgsql.Replication.PgOutput.Messages; +using static Blumchen.Subscriptions.Management.ReplicationSlotManagement.CreateReplicationSlotResult; namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - -using static PublicationManagement; -using static ReplicationSlotManagement; -using static ReplicationSlotManagement.CreateReplicationSlotResult; public sealed class Subscription: IAsyncDisposable { + private LogicalReplicationConnection? _connection; + private readonly OptionsBuilder _builder = new(); + + private ImHashMap _messageHandlers = ImHashMap.Empty; + public enum CreateStyle { WhenNotExists, AlwaysRecreate, Never } - private static LogicalReplicationConnection? _connection; - private static readonly SubscriptionOptionsBuilder Builder = new(); - private ISubscriptionOptions? _options; + public async IAsyncEnumerable Subscribe( - Func builder, - ILoggerFactory? loggerFactory = null, + Func builder, [EnumeratorCancellation] CancellationToken ct = default ) { - _options = builder(Builder).Build(); - var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; - var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); - dataSourceBuilder.UseLoggerFactory(loggerFactory); + await foreach (var _ in Subscribe(builder(_builder).Build(), ct)) + yield return _; + } - var dataSource = dataSourceBuilder.Build(); + internal async IAsyncEnumerable Subscribe( + ISubscriberOptions subscriberOptions, + [EnumeratorCancellation] CancellationToken ct = default + ) + { + var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, + registry) = subscriberOptions; await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); - _connection = new LogicalReplicationConnection(connectionString); + _connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString); await _connection.Open(ct).ConfigureAwait(false); - await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); - var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false); - + var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct) + .ConfigureAwait(false); + var replicationDataMapper = new ReplicationDataMapper(registry); PgOutputReplicationSlot slot; if (result is not Created created) @@ -65,21 +68,29 @@ public async IAsyncEnumerable Subscribe( ) ); - await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false)) - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) - yield return subscribe; + await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, + replicationDataMapper, publicationSetupOptions.TableDescriptor, + publicationSetupOptions.RegisteredTypes, ct).ConfigureAwait(false)) + { + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct) + .ConfigureAwait(false)) + yield return subscribe; + } } await foreach (var message in _connection.StartReplication(slot, - new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, replicationSlotSetupOptions.Binary), ct).ConfigureAwait(false)) + new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, + replicationSlotSetupOptions.Binary), ct).ConfigureAwait(false)) { if (message is InsertMessage insertMessage) { var envelope = await replicationDataMapper.ReadFromReplication(insertMessage, ct).ConfigureAwait(false); - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct) + .ConfigureAwait(false)) yield return subscribe; } + // Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually // so that Npgsql can inform the server which WAL files can be removed/recycled. _connection.SetReplicationStatus(message.WalEnd); @@ -87,57 +98,55 @@ public async IAsyncEnumerable Subscribe( } } - private static async IAsyncEnumerable ProcessEnvelope( + private (IMessageHandler, MethodInfo) GetConsumer( + string messageType, + Type type, + IDictionary> registry + ) + { + if (_messageHandlers.TryFind(messageType, out var tuple)) return tuple; + tuple = MessageHandler(messageType, registry, type); + _messageHandlers = _messageHandlers.AddOrUpdate(messageType, tuple); + return tuple; + + static (IMessageHandler messageHandler, MethodInfo methodInfo) MessageHandler( + string messageType, IDictionary> registry, + Type objType) + { + var (_, messageHandler, methodInfo) = registry.FindByMultiKey(messageType, OptionsBuilder.WildCard) ?? + throw new NotSupportedException( + $"Unregistered type for {objType.AssemblyQualifiedName}"); + return (messageHandler, methodInfo); + } + } + + private async IAsyncEnumerable ProcessEnvelope( IEnvelope envelope, - Dictionary registry, + IDictionary> registry, IErrorProcessor errorProcessor - ) where T:class + ) { switch (envelope) { case KoEnvelope error: - await errorProcessor.Process(error.Error).ConfigureAwait(false); + await errorProcessor.Process(error.Error, error.Id).ConfigureAwait(false); yield break; - case OkEnvelope okEnvelope: + case OkEnvelope(var value, var messageType): { - var obj = okEnvelope.Value; - var objType = obj.GetType(); - var (consumer, methodInfo) = Memoize(registry, objType, Consumer); - await ((Task)methodInfo.Invoke(consumer, [obj])!).ConfigureAwait(false); - yield return (T)envelope; + var (messageHandler, methodInfo) = GetConsumer(messageType, value.GetType(), registry); + await ((Task)methodInfo.Invoke(messageHandler, [value])!).ConfigureAwait(false); + yield return envelope; yield break; } } } - private static readonly Dictionary Cache = []; - - - private static (IConsume consumer, MethodInfo methodInfo) Memoize - ( - Dictionary registry, - Type objType, - Func, Type, (IConsume consumer, MethodInfo methodInfo)> func - ) - { - if (!Cache.TryGetValue(objType, out var entry)) - entry = func(registry, objType); - Cache[objType] = entry; - return entry; - } - private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) - { - var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); - var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public); - var methodInfo = methodInfos.SingleOrDefault(mi=>mi.GetParameters().Any(pa => pa.ParameterType == objType)) - ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); - return (consumer, methodInfo); - } - private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( NpgsqlDataSource dataSource, string snapshotName, - ISubscriptionOptions options, + IReplicationDataMapper dataMapper, + TableDescriptorBuilder.MessageTable tableDescriptor, + ISet registeredTypes, [EnumeratorCancellation] CancellationToken ct = default ) { @@ -145,16 +154,16 @@ private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( await using var connection1 = connection.ConfigureAwait(false); await foreach (var row in connection.GetRowsFromSnapshot( snapshotName, - options.PublicationOptions.TableDescriptor, - options.DataMapper, - options.PublicationOptions.TypeResolver.Keys().ToHashSet(), + tableDescriptor, + dataMapper, + registeredTypes, ct).ConfigureAwait(false)) yield return row; } public async ValueTask DisposeAsync() { - if(_connection != null) + if (_connection != null) await _connection.DisposeAsync().ConfigureAwait(false); } } diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs deleted file mode 100644 index 13a97d8..0000000 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ /dev/null @@ -1,140 +0,0 @@ -using Blumchen.Serialization; -using Blumchen.Subscriptions.Management; -using Blumchen.Subscriptions.Replication; -using JetBrains.Annotations; -using System.Text.Json.Serialization; - -namespace Blumchen.Subscriptions; -#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member -public sealed class SubscriptionOptionsBuilder -{ - private static string? _connectionString; - private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; - private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; - private static IReplicationDataMapper? _dataMapper; - private readonly Dictionary _registry = []; - private IErrorProcessor? _errorProcessor; - private INamingPolicy? _namingPolicy; - private JsonSerializerContext? _jsonSerializerContext; - private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); - private TableDescriptorBuilder.MessageTable? _messageTable; - - - static SubscriptionOptionsBuilder() - { - _connectionString = null; - _publicationSetupOptions = new(); - _replicationSlotSetupOptions = default; - _dataMapper = default; - } - - - [UsedImplicitly] - public SubscriptionOptionsBuilder WithTable( - Func builder) - { - _messageTable = builder(TableDescriptorBuilder).Build(); - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder ConnectionString(string connectionString) - { - _connectionString = connectionString; - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder NamingPolicy(INamingPolicy namingPolicy) - { - _namingPolicy = namingPolicy; - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext) - { - _jsonSerializerContext = jsonSerializerContext; - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.PublicationSetupOptions publicationOptions) - { - _publicationSetupOptions = - publicationOptions with { TypeResolver = _publicationSetupOptions.TypeResolver}; - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotOptions) - { - _replicationSlotSetupOptions = replicationSlotOptions; - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class - where TU : class, IConsumes - { - _registry.TryAdd(typeof(T), consumer); - return this; - } - - [UsedImplicitly] - public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProcessor) - { - _errorProcessor = errorProcessor; - return this; - } - - internal ISubscriptionOptions Build() - { - ArgumentNullException.ThrowIfNull(_connectionString); - ArgumentNullException.ThrowIfNull(_jsonSerializerContext); - ArgumentNullException.ThrowIfNull(_messageTable); - - var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); - foreach (var type in _registry.Keys) typeResolver.WhiteList(type); - _dataMapper = new ReplicationDataMapper(typeResolver); - _publicationSetupOptions = _publicationSetupOptions with { TypeResolver = typeResolver, TableDescriptor = _messageTable}; - - Ensure(() =>_registry.Keys.Except(_publicationSetupOptions.TypeResolver.Values()), "Unregistered types:{0}"); - Ensure(() => _publicationSetupOptions.TypeResolver.Values().Except(_registry.Keys), "Unregistered consumer for type:{0}"); - if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); - - return new SubscriptionOptions( - _connectionString, - _publicationSetupOptions, - _replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), - _errorProcessor ?? new ConsoleOutErrorProcessor(), - _dataMapper, - _registry); - static void Ensure(Func> evalFn, string formattedMsg) - { - var misses = evalFn().ToArray(); - if (misses.Length > 0) throw new Exception(string.Format(formattedMsg, string.Join(", ", misses.Select(t => $"'{t.Name}'")))); - } - - } -} - -public class ObjectTracingConsumer: IConsumes -{ - private static ulong _counter = 0; - public Task Handle(object value) - { - Interlocked.Increment(ref _counter); - return Console.Out.WriteLineAsync(); - } -} - -public interface IErrorProcessor -{ - Func Process { get; } -} - -public record ConsoleOutErrorProcessor: IErrorProcessor -{ - public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); -} diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/TableDescriptorBuilder.cs similarity index 80% rename from src/Blumchen/MessageTableOptions.cs rename to src/Blumchen/TableDescriptorBuilder.cs index a7fe0b1..4a2147e 100644 --- a/src/Blumchen/MessageTableOptions.cs +++ b/src/Blumchen/TableDescriptorBuilder.cs @@ -1,16 +1,16 @@ using Blumchen.Subscriptions; +using JetBrains.Annotations; using NpgsqlTypes; namespace Blumchen; -#pragma warning disable CS1591 public record TableDescriptorBuilder { private MessageTable TableDescriptor { get; set; } = new(); public MessageTable Build() => TableDescriptor.Build(); - public TableDescriptorBuilder Name(string eventsTable) + public TableDescriptorBuilder Named(string eventsTable) { TableDescriptor = new MessageTable(eventsTable); return this; @@ -22,9 +22,9 @@ public TableDescriptorBuilder Id(string name) return this; } - public TableDescriptorBuilder MessageData(string name, MimeType mime) + public TableDescriptorBuilder MessageData(string name) { - TableDescriptor = TableDescriptor with { Data = new Column.Data(name), MimeType = mime }; + TableDescriptor = TableDescriptor with { Data = new Column.Data(name), MimeType = MimeType.Json }; return this; } @@ -34,21 +34,25 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250) return this; } + [UsedImplicitly] + public TableDescriptorBuilder UseDefaults() => this; + public record MessageTable(string Name = MessageTable.DefaultName) { internal const string DefaultName = "outbox"; public Column.Id Id { get; internal init; } = Column.Id.Default(); public Column.MessageType MessageType { get; internal init; } = Column.MessageType.Default(); public Column.Data Data { get; internal init; } = Column.Data.Default(); - public MimeType MimeType { get; internal init; } = new MimeType.Json(); + public MimeType MimeType { get; internal init; } = MimeType.Json; public MessageTable Build() => this; - public override string ToString() => @$" + public override string ToString() => $""" CREATE TABLE IF NOT EXISTS {Name} ( - {Id} PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + {Id} PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, {MessageType} NOT NULL, - {Data} NOT NULL - );"; + {Data} NOT NULL + ); + """; } public record Column(string Name, NpgsqlDbType Type) diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs deleted file mode 100644 index 93195a2..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs +++ /dev/null @@ -1,96 +0,0 @@ -using Commons.Events; -using PostgresOutbox.Events; -using PostgresOutbox.Serialization; -using PostgresOutbox.Subscriptions; -using PostgresOutbox.Subscriptions.Replication; -using Xunit.Abstractions; -using static PostgresOutbox.Subscriptions.Management.PublicationManagement; -using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; - -namespace PostgresOutboxPatternWithCDC.NET.Tests; - -public class LogicalReplicationTest -{ - private readonly ITestOutputHelper testOutputHelper; - - public LogicalReplicationTest(ITestOutputHelper testOutputHelper) - { - this.testOutputHelper = testOutputHelper; - } - - [Fact] - public async Task WALSubscriptionForNewEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - await foreach (var readEvent in events.WithCancellation(ct)) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - [Fact] - public async Task WALSubscriptionForOldEventsShouldWork() - { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - - var eventsTable = await CreateEventsTable(ConnectrionString, ct); - - var subscriptionOptions = new SubscriptionOptions( - ConnectrionString, - new PublicationSetupOptions(Randomise("events_pub"),eventsTable), - new ReplicationSlotSetupOptions(Randomise("events_slot")), - new EventDataMapper() - ); - var subscription = new Subscription(); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await EventsAppender.AppendAsync(eventsTable, @event, ConnectrionString, ct); - - var events = subscription.Subscribe(subscriptionOptions, ct); - - await foreach (var readEvent in events) - { - testOutputHelper.WriteLine(JsonSerialization.ToJson(readEvent)); - Assert.Equal(@event, readEvent); - return; - } - } - - private async Task CreateEventsTable( - string connectionString, - CancellationToken ct - ) - { - var tableName = Randomise("events"); - - await EventsTable.Create(connectionString, tableName, ct); - - return tableName; - } - - private static string Randomise(string prefix) => - $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; - - private const string ConnectrionString = - "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'"; -} diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj b/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj deleted file mode 100644 index 9e0d56c..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj +++ /dev/null @@ -1,25 +0,0 @@ - - - - net8.0 - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - diff --git a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs deleted file mode 100644 index 8c927eb..0000000 --- a/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs +++ /dev/null @@ -1 +0,0 @@ -global using Xunit; \ No newline at end of file diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs index e9995e9..2ef3048 100644 --- a/src/Publisher/Contracts.cs +++ b/src/Publisher/Contracts.cs @@ -4,27 +4,33 @@ namespace Publisher; public interface IContract{} -[MessageUrn("user-created:v1")] +[MessageRoutedByUrn("user-created:v1")] internal record UserCreated( Guid Id, string Name = "Created" ):IContract; -[MessageUrn("user-deleted:v1")] +[MessageRoutedByUrn("user-deleted:v1")] internal record UserDeleted( Guid Id, string Name = "Deleted" ): IContract; -[MessageUrn("user-modified:v1")] //subscription ignored +[MessageRoutedByUrn("user-modified:v1")] internal record UserModified( Guid Id, string Name = "Modified" ): IContract; +[MessageRoutedByUrn("user-subscribed:v1")] +internal record UserSubscribed( + Guid Id, + string Name = "Subscribed" +): IContract; [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreated))] [JsonSerializable(typeof(UserDeleted))] [JsonSerializable(typeof(UserModified))] +[JsonSerializable(typeof(UserSubscribed))] internal partial class SourceGenerationContext: JsonSerializerContext; diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index dae2987..28cfaf5 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -1,83 +1,151 @@ -using Blumchen.Publications; +using Blumchen.Database; +using Blumchen.Publisher; using Blumchen.Serialization; +using CommandLine; +using CommandLine.Text; using Commons; +using Microsoft.Extensions.Logging; using Npgsql; using Publisher; using UserCreated = Publisher.UserCreated; using UserDeleted = Publisher.UserDeleted; using UserModified = Publisher.UserModified; +using UserSubscribed = Publisher.UserSubscribed; Console.Title = typeof(Program).Assembly.GetName().Name!; -Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); -var resolver = new PublisherSetupOptionsBuilder() +var generator = Options.Generator; + +var cts = new CancellationTokenSource(); + +var resolver = await new OptionsBuilder() .JsonContext(SourceGenerationContext.Default) .NamingPolicy(new AttributeNamingPolicy()) - .Build(); + .WithTable(builder => builder.UseDefaults()) //default, but explicit + .Build() + .EnsureTable(Settings.ConnectionString, cts.Token)//enforce table existence and conformity - db roundtrip + .ConfigureAwait(false); + +//Or you might want to verify at a later stage +var loggerFactory = LoggerFactory.Create(builder => builder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddFilter("Npgsql", LogLevel.Information) + .AddFilter("Blumchen", LogLevel.Trace) + .AddFilter("Publisher", LogLevel.Debug) + .AddSimpleConsole()); +var logger = loggerFactory.CreateLogger(); +await new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(loggerFactory) + .Build() + .EnsureTableExists(resolver.TableDescriptor, cts.Token).ConfigureAwait(false); +//Uncomment to attach debugger +//System.Diagnostics.Debugger.Launch(); +async Task GenerateFn(Options o) => + await Generate(generator.Join(o.MessageTypes, pair => pair.Key, s => s, (pair, s) => pair).ToDictionary(), + o.Count, resolver, logger, cts); -do +if (args.Length > 0) //cli { + Parser.Default.ParseArguments(args) + .WithParsed(options => Task.WaitAll([GenerateFn(options)])) + .WithNotParsed(HandleParseError); +} +else +{ + Parser.Default.ParseArguments("--help".Split()); + do + { + Parser.Default.ParseArguments(Console.ReadLine()?.Split()) + .WithParsed(options => Task.WaitAll([GenerateFn(options)])) + .WithNotParsed(HandleParseError); + + } while (true); +} - var line = Console.ReadLine(); - if (line != null && int.TryParse(line, out var result)) +return; + +static void HandleParseError(IEnumerable errs) => Console.WriteLine("Errors:" + string.Join(',', errs.Select(e => e.Tag))); + +async Task Generate(Dictionary> dictionary, int count, PublisherOptions publisherOptions, + ILogger l, + CancellationTokenSource cancellationTokenSource) +{ + var generatorLength = dictionary.Count; + var messageCount = count / generatorLength; + var ct = cancellationTokenSource.Token; + var connection = new NpgsqlConnection(Settings.ConnectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.OpenAsync(ct).ConfigureAwait(false); + //use a command for each message { - var cts = new CancellationTokenSource(); + var tuple = Enumerable.Range(0, count).Select(i => + dictionary.ElementAt(i % generatorLength)); + + var messageByType = string.Join(", ", + dictionary.Keys.Select((key, i) => + $"Publishing {(messageCount + (count % generatorLength > i ? 1 : 0))} {key}")); + l.LogInformation(messageByType); + - var ct = cts.Token; - var connection = new NpgsqlConnection(Settings.ConnectionString); - await using var connection1 = connection.ConfigureAwait(false); - await connection.OpenAsync(ct).ConfigureAwait(false); - //use a command for each message + foreach (var message in tuple.Select(pair => pair.Value())/*.Chunk(10)*/)//Chunking enable batch insert { - var @events = Enumerable.Range(0, result).Select(i => - (i % 3) switch - { - 0 => new UserCreated(Guid.NewGuid()) as object, - 1 => new UserDeleted(Guid.NewGuid()), - _ => new UserModified(Guid.NewGuid()) - }); - foreach (var @event in @events) + var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); + try { - var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); - try - { - switch (@event) - { - case UserCreated m: - await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); - break; - case UserDeleted m: - await MessageAppender.AppendAsync( m, resolver, connection, transaction, ct).ConfigureAwait(false); - break; - case UserModified m: - await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); - break; - } - - await transaction.CommitAsync(ct).ConfigureAwait(false); - } - catch (Exception e) - { - await transaction.RollbackAsync(ct).ConfigureAwait(false); - Console.WriteLine(e); - throw; - } + await MessageAppender.AppendAsync(message, publisherOptions, connection, transaction, ct).ConfigureAwait(false); + //OR with typed version + //switch (message) + //{ + // case UserCreated m: + // await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); + // break; + // case UserDeleted m: + // await MessageAppender.AppendAsync( m, resolver, connection, transaction, ct).ConfigureAwait(false); + // break; + // case UserModified m: + // await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); + // break; + // case UserSubscribed m: + // await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); + // break; + //} + + await transaction.CommitAsync(ct).ConfigureAwait(false); + } + catch (Exception e) + { + await transaction.RollbackAsync(ct).ConfigureAwait(false); + l.LogCritical(e, e.Message); + throw; } } - //use a batch command - //{ - // var transaction = await connection.BeginTransactionAsync(ct); - // try - // { - // var @events = Enumerable.Range(0, result) - // .Select(i1 => new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString())); - // await MessageAppender.AppendAsync(@events, resolver, connection, transaction, ct); - // } - // catch (Exception e) - // { - // Console.WriteLine(e); - // throw; - // } - //} + l.LogInformation("Published {count} messages!", count); } -} while (true); +} + +internal class Options(IEnumerable messageTypes, int count) +{ + internal static readonly Dictionary> Generator = new() + { + { nameof(UserCreated), () => new UserCreated(Guid.NewGuid()) }, + { nameof(UserDeleted), () => new UserDeleted(Guid.NewGuid()) }, + { nameof(UserModified), () => new UserModified(Guid.NewGuid()) }, + { nameof(UserSubscribed), () => new UserSubscribed(Guid.NewGuid()) } + }; + private static readonly int CountByType = TotalCount / Generator.Count; + private static readonly int Mod = TotalCount % CountByType; + private const int TotalCount = 10; + + [Option('t', "type", Required = true, HelpText = "Message type.", Separator = '|')] + public IEnumerable MessageTypes { get; } = messageTypes; + + [Option('c', "count", Required = true, HelpText = "Total number")] + public int Count { get; } = count; + + [Usage] + public static IEnumerable Examples => + [ + new Example($"Publish {string.Join(" and ", Generator.Keys.Select((type,i)=> $"{CountByType + (Mod>i?1:0)} {type}"))} messages", new Options(Generator.Keys, TotalCount)) + ]; +} diff --git a/src/Publisher/Publisher.csproj b/src/Publisher/Publisher.csproj index 86277ed..e6a52a5 100644 --- a/src/Publisher/Publisher.csproj +++ b/src/Publisher/Publisher.csproj @@ -1,4 +1,4 @@ - + net8.0 @@ -9,8 +9,13 @@ - - + + + + + + + diff --git a/src/Subscriber/Contracts.cs b/src/Subscriber/Contracts.cs index 527486b..e3c03ae 100644 --- a/src/Subscriber/Contracts.cs +++ b/src/Subscriber/Contracts.cs @@ -3,20 +3,20 @@ namespace Subscriber { - [MessageUrn("user-created:v1")] + [MessageRoutedByUrn("user-created:v1")] public record UserCreatedContract( Guid Id, string Name ); - [MessageUrn("user-deleted:v1")] - public record UserDeletedContract( - Guid Id, - string Name - ); + [RawRoutedByUrn("user-deleted:v1")] + public class MessageObjects; + + + [RawRoutedByUrn("user-modified:v1")] + internal class MessageString; [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreatedContract))] - [JsonSerializable(typeof(UserDeletedContract))] internal partial class SourceGenerationContext: JsonSerializerContext; } diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index c023ac0..1cedf67 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -1,7 +1,9 @@ using Blumchen.Serialization; using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Replication; using Commons; using Microsoft.Extensions.Logging; +using Npgsql; using Subscriber; #pragma warning disable CS8601 // Possible null reference assignment. @@ -14,27 +16,51 @@ TaskScheduler.UnobservedTaskException += (_,e) => Console.Out.WriteLine(e.Exception.ToString()); var ct = cancellationTokenSource.Token; -var consumer = new Consumer(); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); try { + + var loggerFactory = LoggerFactory.Create(builder => builder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddFilter("Npgsql", LogLevel.Information) + .AddFilter("Blumchen", LogLevel.Debug) + .AddFilter("Subscriber", LogLevel.Trace) + .AddSimpleConsole()); + var logger = loggerFactory.CreateLogger("Subscriber"); + var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(loggerFactory); var cursor = subscription.Subscribe( - builder => builder - .ConnectionString(Settings.ConnectionString) - .WithTable(options => options - .Id("id") - .MessageType("message_type") - .MessageData("data", new MimeType.Json()) - ) - .NamingPolicy(new AttributeNamingPolicy()) - .JsonContext(SourceGenerationContext.Default) - .Consumes(consumer) - .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + builder => + { + var consumer = new Consumer(loggerFactory.CreateLogger()); + return builder + .DataSource(dataSourceBuilder.Build()) + .ConnectionString(Settings.ConnectionString) + .WithTable(options => options + .Id("id") + .MessageType("message_type") + .MessageData("data") + ) + .Consumes(consumer, opts => + opts + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy())) + .ConsumesRawString(consumer) + .ConsumesRawObject(consumer); + } + //OR + //.ConsumesRawStrings(consumer) + //OR + //.ConsumesRawObjects(consumer) + , ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); - while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) + if(logger.IsEnabled(LogLevel.Trace)) + logger.LogTrace("{message} processed", cursor.Current); } catch (Exception e) { @@ -45,11 +71,27 @@ namespace Subscriber { - internal class Consumer: - IConsumes, - IConsumes + internal class Consumer(ILogger logger): + IMessageHandler, + IMessageHandler, + IMessageHandler { - public Task Handle(UserCreatedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserCreatedContract)); - public Task Handle(UserDeletedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserDeletedContract)); + private int _completed; + + private Task ReportSuccess(int count) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); + return Task.CompletedTask; + } + + private Task Handle(T value) => + ReportSuccess(Interlocked.Increment(ref _completed)); + + public Task Handle(string value) => Handle(value); + public Task Handle(object value) => Handle(value); + public Task Handle(UserCreatedContract value) => + Handle(value); + } } diff --git a/src/Subscriber/Subscriber.csproj b/src/Subscriber/Subscriber.csproj index d5d5273..1cc04fd 100644 --- a/src/Subscriber/Subscriber.csproj +++ b/src/Subscriber/Subscriber.csproj @@ -5,12 +5,14 @@ net8.0 enable enable - true + True + true false + diff --git a/src/SubscriberWorker/Contracts.cs b/src/SubscriberWorker/Contracts.cs new file mode 100644 index 0000000..fc4719a --- /dev/null +++ b/src/SubscriberWorker/Contracts.cs @@ -0,0 +1,29 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; + +namespace SubscriberWorker +{ + [MessageRoutedByUrn("user-created:v1")] + public record UserCreatedContract( + Guid Id, + string Name + ); + + [MessageRoutedByUrn("user-deleted:v1")] + public record UserDeletedContract( + Guid Id, + string Name + ); + + [MessageRoutedByUrn("user-modified:v1")] //subscription ignored + public record UserModifiedContract( + Guid Id, + string Name = "Modified" + ); + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(UserCreatedContract))] + [JsonSerializable(typeof(UserDeletedContract))] + [JsonSerializable(typeof(UserModifiedContract))] + internal partial class SourceGenerationContext: JsonSerializerContext; +} diff --git a/src/SubscriberWorker/MessageHandler.cs b/src/SubscriberWorker/MessageHandler.cs new file mode 100644 index 0000000..d9c490e --- /dev/null +++ b/src/SubscriberWorker/MessageHandler.cs @@ -0,0 +1,38 @@ +using Blumchen.Subscriptions.Replication; +using Microsoft.Extensions.Logging; +#pragma warning disable CS9113 // Parameter is unread. + +namespace SubscriberWorker; + +public class HandlerBase(ILogger logger) +{ + private int _counter; + private int _completed; + + private Task ReportSuccess(int count) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"Read #{count} messages {typeof(T).FullName}"); + return Task.CompletedTask; + } + + protected Task Handle(T value) => + Interlocked.Increment(ref _counter) % 10 == 0 + //Simulating some exception on out of process dependencies + ? Task.FromException(new Exception($"Error on publishing {typeof(T).FullName}")) + : ReportSuccess(Interlocked.Increment(ref _completed)); +} + +public class HandleImpl2(ILogger logger) + : HandlerBase(logger), IMessageHandler +{ + public Task Handle(UserDeletedContract value) => Handle(value); +} + +public class HandleImpl1(ILogger logger) + : HandlerBase(logger), IMessageHandler, IMessageHandler +{ + public Task Handle(UserCreatedContract value) => Handle(value); + + public Task Handle(UserModifiedContract value) => Handle(value); +} diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs new file mode 100644 index 0000000..93ec808 --- /dev/null +++ b/src/SubscriberWorker/Program.cs @@ -0,0 +1,104 @@ +using System.Text.Json.Serialization; +using Blumchen.DependencyInjection; +using Blumchen.Serialization; +using Blumchen.Subscriptions; +using Commons; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Polly.Retry; +using Polly; +using SubscriberWorker; +using Npgsql; +using Blumchen.Subscriptions.Management; +using Blumchen.Subscriptions.Replication; +using Polly.Registry; + +#pragma warning disable CS8601 // Possible null reference assignment. +Console.Title = typeof(Program).Assembly.GetName().Name; +#pragma warning restore CS8601 // Possible null reference assignment. + +AppDomain.CurrentDomain.UnhandledException += (_, e) => Console.Out.WriteLine(e.ExceptionObject.ToString()); +TaskScheduler.UnobservedTaskException += (_, e) => Console.Out.WriteLine(e.Exception.ToString()); + +var cancellationTokenSource = new CancellationTokenSource(); +var builder = Host.CreateApplicationBuilder(args); + +builder.Services + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl1>() + .AddSingleton, HandleImpl2>() + + .AddSingleton() + + .AddSingleton() + .AddSingleton() + + .AddResiliencePipeline("default", (pipelineBuilder, _) => + pipelineBuilder + .AddRetry(new RetryStrategyOptions + { + BackoffType = DelayBackoffType.Constant, + Delay = TimeSpan.FromSeconds(5), + MaxRetryAttempts = int.MaxValue + }).Build()) + + .AddLogging(loggingBuilder => + { + loggingBuilder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddFilter("Npgsql", LogLevel.Information) + .AddFilter("Blumchen", LogLevel.Trace) + .AddFilter("SubscriberWorker", LogLevel.Debug) + .AddSimpleConsole(); + }) + .AddTransient(sp => + new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(sp.GetRequiredService()).Build()) + + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions + .ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions( + new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl1)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl1)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + + .Consumes(provider.GetRequiredService>(), opts => + opts + .Consumes(provider.GetRequiredService>()) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(provider.GetRequiredService())) + ) + .ResiliencyPipeline( + provider.GetRequiredService>().GetPipeline("default")) + .EnableSubscriptionAutoHeal() + ) + .AddBlumchen((provider, workerOptions) => + workerOptions + .Subscription(subscriptionOptions => + subscriptionOptions.ConnectionString(Settings.ConnectionString) + .DataSource(provider.GetRequiredService()) + .WithReplicationOptions( + new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl2)}_slot")) + .WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl2)}_pub")) + .WithErrorProcessor(provider.GetRequiredService()) + + .Consumes(provider.GetRequiredService>(), opts => + opts + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(provider.GetRequiredService()) + )) + .ResiliencyPipeline( + provider.GetRequiredService>().GetPipeline("default")) + .EnableSubscriptionAutoHeal() + ); + +await builder + .Build() + .RunAsync(cancellationTokenSource.Token) + .ConfigureAwait(false); diff --git a/src/SubscriberWorker/SubscriberWorker.csproj b/src/SubscriberWorker/SubscriberWorker.csproj new file mode 100644 index 0000000..17aa463 --- /dev/null +++ b/src/SubscriberWorker/SubscriberWorker.csproj @@ -0,0 +1,23 @@ + + + + Exe + net8.0 + enable + enable + true + true + false + + + + + + + + + + + + + diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index fc71e14..ba5ee9e 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -4,20 +4,22 @@ using Blumchen; using Blumchen.Database; using Blumchen.Serialization; +using Blumchen.Subscriber; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; +using Blumchen.Subscriptions.Replication; +using Microsoft.Extensions.Logging; using Npgsql; using Testcontainers.PostgreSql; using Xunit.Abstractions; namespace Tests; - public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime { protected ITestOutputHelper Output { get; } = output; - protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2)); - protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class + protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(3)); + protected class TestMessageHandler(Action log, JsonTypeInfo info): IMessageHandler where T : class { public async Task Handle(T value) { @@ -33,6 +35,18 @@ public async Task Handle(T value) } } + protected class TestHandler(ILogger> logger): IMessageHandler where T : class + { + private int _counter; + internal int Counter => _counter; + public Task Handle(T value) + { + logger.LogTrace($"Message consumed:{value}"); + Interlocked.Increment(ref _counter); + return Task.CompletedTask; + } + } + protected readonly PostgreSqlContainer Container = new PostgreSqlBuilder() .WithCommand("-c", "wal_level=logical") .Build(); @@ -48,7 +62,7 @@ CancellationToken ct { var tableName = Randomise("outbox"); - var tableDesc = new TableDescriptorBuilder().Name(tableName).Build(); + var tableDesc = new TableDescriptorBuilder().Named(tableName).Build(); await dataSource.EnsureTableExists(tableDesc, ct).ConfigureAwait(false); return tableName; @@ -67,7 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri await command.ExecuteNonQueryAsync(ct); } - protected (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + protected OptionsBuilder SetupFor( string connectionString, string eventsTable, JsonSerializerContext info, @@ -78,28 +92,32 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri { var jsonTypeInfo = info.GetTypeInfo(typeof(T)); ArgumentNullException.ThrowIfNull(jsonTypeInfo); - var consumer = new TestConsumer(log, jsonTypeInfo); - var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() + var consumer = new TestMessageHandler(log, jsonTypeInfo); + var subscriptionOptionsBuilder = new OptionsBuilder() .WithErrorProcessor(new TestOutErrorProcessor(Output)) + .DataSource(new NpgsqlDataSourceBuilder(connectionString).Build()) .ConnectionString(connectionString) - .JsonContext(info) - .NamingPolicy(namingPolicy) - .Consumes>(consumer) - .WithTable(o => o.Name(eventsTable)) + + .Consumes(consumer, opts => opts + .WithJsonContext(info) + .AndNamingPolicy(namingPolicy)) + + .WithTable(o => o.Named(eventsTable)) .WithPublicationOptions( - new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) + new PublicationManagement.PublicationOptions( + PublicationName: publicationName ?? Randomise("events_pub")) ) .WithReplicationOptions( - new ReplicationSlotManagement.ReplicationSlotSetupOptions(slotName ?? Randomise("events_slot")) + new ReplicationSlotManagement.ReplicationSlotOptions(slotName ?? Randomise("events_slot")) ); - return (consumer, subscriptionOptionsBuilder); + return subscriptionOptionsBuilder; } private sealed record TestOutErrorProcessor(ITestOutputHelper Output): IErrorProcessor { - public Func Process => exception => + public Func Process => (exception, id) => { - Output.WriteLine($"record id:{0} resulted in error:{exception.Message}"); + Output.WriteLine($"record id:{id} resulted in error:{exception.Message}"); return Task.CompletedTask; }; } diff --git a/src/Tests/PublisherContext.cs b/src/Tests/PublisherContext.cs index a35dd25..b8bbc3f 100644 --- a/src/Tests/PublisherContext.cs +++ b/src/Tests/PublisherContext.cs @@ -3,13 +3,13 @@ namespace Tests; -[MessageUrn("user-created:v1")] +[MessageRoutedByUrn("user-created:v1")] internal record PublisherUserCreated( Guid Id, string Name ); -[MessageUrn("user-deleted:v1")] +[MessageRoutedByUrn("user-deleted:v1")] internal record PublisherUserDeleted( Guid Id, string Name diff --git a/src/Tests/ServiceCollectionExtensions.cs b/src/Tests/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..f0b1d23 --- /dev/null +++ b/src/Tests/ServiceCollectionExtensions.cs @@ -0,0 +1,15 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace Tests; + +internal static class ServiceCollectionExtensions +{ + internal static IServiceCollection AddXunitLogging(this IServiceCollection services, ITestOutputHelper output) + => services.AddLogging(loggingBuilder => + loggingBuilder + .AddFilter("Tests", LogLevel.Trace) + .AddXunit(output) + ); +} diff --git a/src/Tests/ServiceWorker..cs b/src/Tests/ServiceWorker..cs new file mode 100644 index 0000000..9b52684 --- /dev/null +++ b/src/Tests/ServiceWorker..cs @@ -0,0 +1,111 @@ +using Blumchen.DependencyInjection; +using Blumchen.Publisher; +using Blumchen.Serialization; +using Blumchen.Subscriber; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit.Abstractions; +using SubscriberOptionsBuilder = Blumchen.Subscriber.OptionsBuilder; +using PublisherOptionsBuilder = Blumchen.Publisher.OptionsBuilder; +using Microsoft.Extensions.Logging; + +namespace Tests +{ + public class ServiceWorker(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) + { + [Fact] + public async Task ConsumesRawStrings() => await Consumes( + (services, opts) => opts.ConsumesRawStrings(services.GetRequiredService>()) + ); + + [Fact] + public async Task ConsumesRawObjects() => + await Consumes( + (services,opts) => opts.ConsumesRawObjects(services.GetRequiredService>()) + ); + + [Fact] + public async Task ConsumesRawString() => await Consumes( + (services, opts) => opts.ConsumesRawString(services.GetRequiredService>()) + ); + + [Fact] + public async Task ConsumesRawObject() => await Consumes( + (services, opts) => + { + var handler = services.GetRequiredService>(); + return opts.ConsumesRawObject(handler); + }); + + [Fact] + public async Task ConsumesJson_without_shared_kernel() => await Consumes( + (services, builder) => builder + .Consumes(services.GetRequiredService>(), + opts => opts + .WithJsonContext(SubscriberContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy()) + ) + ); + + [Fact] + public async Task ConsumesJson_with_shared_kernel() + { + var namingPolicy = new FQNNamingPolicy(); + await Consumes( + (services, builder) => builder + .Consumes(services.GetRequiredService>(), + opts => opts + .WithJsonContext(PublisherContext.Default) + .AndNamingPolicy(namingPolicy) + ), namingPolicy + ); + } + + [Fact] + public async Task ConsumesRawString_from_FQNNaming() + { + await Consumes( + (services, builder) => builder + .ConsumesRawString(services.GetRequiredService>() + ), new FQNNamingPolicy() + ); + } + + private async Task Consumes( + Func consumesFn, + INamingPolicy? namingPolicy = default + ) where T : class + { + var ct = TimeoutTokenSource(); + + var options = await new PublisherOptionsBuilder() + .JsonContext(PublisherContext.Default) + .NamingPolicy(namingPolicy ?? new AttributeNamingPolicy()) + .Build() + .EnsureTable(Container.GetConnectionString(), ct.Token); + + await MessageAppender.AppendAsync( + new PublisherUserCreated(Guid.NewGuid(), nameof(PublisherUserCreated)), + options, + Container.GetConnectionString(), + ct.Token + ); + + var builder = Host.CreateApplicationBuilder(); + builder.Services + .AddXunitLogging(Output) + .AddSingleton>() + .AddBlumchen>( + Container.GetConnectionString(), + consumesFn + ); + + using var host = builder.Build(); + var handler = host.Services.GetRequiredService>(); + await host.RunAsync(ct.Token); + Assert.True(handler.Counter > 0); + + } + } +} diff --git a/src/Tests/SubscriberContext.cs b/src/Tests/SubscriberContext.cs index 892d0a8..52012e1 100644 --- a/src/Tests/SubscriberContext.cs +++ b/src/Tests/SubscriberContext.cs @@ -4,7 +4,7 @@ namespace Tests; -[MessageUrn("user-created:v1")] +[MessageRoutedByUrn("user-created:v1")] internal record SubscriberUserCreated( Guid Id, string Name @@ -18,3 +18,9 @@ string Name [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(SubscriberUserCreated))] internal partial class SubscriberContext: JsonSerializerContext; + +[RawRoutedByUrn("user-created:v1")] +[RawRoutedByString("Tests.PublisherUserCreated")] +internal class DecoratedContract; + + diff --git a/src/Tests/Tests.csproj b/src/Tests/Tests.csproj index 029d4bc..59e3a57 100644 --- a/src/Tests/Tests.csproj +++ b/src/Tests/Tests.csproj @@ -1,4 +1,4 @@ - + net8.0 @@ -7,14 +7,16 @@ + + - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/if_subscription_already_exists.cs similarity index 78% rename from src/Tests/When_Subscription_Already_Exists.cs rename to src/Tests/if_subscription_already_exists.cs index 4575b69..19c2a24 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/if_subscription_already_exists.cs @@ -1,35 +1,35 @@ -using Blumchen.Publications; +using Blumchen.Publisher; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; -using Blumchen.Subscriptions.ReplicationMessageHandlers; +using Blumchen.Subscriptions.Replication; using Npgsql; using Xunit.Abstractions; namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) +public class if_subscription_already_exists(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] - public async Task Read_from_transaction_log() + public async Task read_from_transaction_log() { var ct = TimeoutTokenSource().Token; var sharedNamingPolicy = new AttributeNamingPolicy(); var connectionString = Container.GetConnectionString(); var dataSource = NpgsqlDataSource.Create(connectionString); var eventsTable = await CreateOutboxTable(dataSource, ct); - var opts = new PublisherSetupOptionsBuilder() + var opts = new OptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) - .WithTable(o => o.Name(eventsTable)) + .WithTable(o => o.Named(eventsTable)) .Build(); var slotName = "subscription_test"; var publicationName = "publication_test"; await dataSource.CreatePublication(publicationName, eventsTable, new HashSet{"urn:message:user-created:v1"}, ct); - var (_, subscriptionOptions) = SetupFor(connectionString, eventsTable, + var subscriptionOptions = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine, publicationName: publicationName, slotName: slotName); //subscriber ignored msg @@ -46,7 +46,7 @@ await MessageAppender.AppendAsync( var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/if_subscription_does_not_exist_and_table_is_empty.cs similarity index 72% rename from src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs rename to src/Tests/if_subscription_does_not_exist_and_table_is_empty.cs index fd378d2..2f74ecd 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/if_subscription_does_not_exist_and_table_is_empty.cs @@ -1,28 +1,27 @@ -using Blumchen; -using Blumchen.Publications; +using Blumchen.Publisher; using Blumchen.Serialization; using Blumchen.Subscriptions; -using Blumchen.Subscriptions.ReplicationMessageHandlers; +using Blumchen.Subscriptions.Replication; using Npgsql; using Xunit.Abstractions; namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) +public class if_subscription_does_not_exist_and_table_is_empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] - public async Task Read_from_table_using_named_transaction_snapshot() + public async Task read_from_table_using_named_transaction_snapshot() { var ct = TimeoutTokenSource().Token; var connectionString = Container.GetConnectionString(); var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); var sharedNamingPolicy = new AttributeNamingPolicy(); - var resolver = new PublisherSetupOptionsBuilder() + var resolver = new OptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) - .WithTable(o => o.Name(eventsTable)) + .WithTable(o => o.Named(eventsTable)) .Build(); //subscriber ignored msg await MessageAppender.AppendAsync(new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); @@ -35,11 +34,11 @@ public async Task Read_from_table_using_named_transaction_snapshot() await MessageAppender.AppendAsync(@event, resolver, connectionString, ct); - var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, + var subscriptionOptions = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/if_subscription_does_not_exist_and_table_is_not_empty.cs similarity index 75% rename from src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs rename to src/Tests/if_subscription_does_not_exist_and_table_is_not_empty.cs index 6422396..7a720de 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/if_subscription_does_not_exist_and_table_is_not_empty.cs @@ -1,27 +1,27 @@ -using Blumchen.Publications; +using Blumchen.Publisher; using Blumchen.Serialization; using Blumchen.Subscriptions; -using Blumchen.Subscriptions.ReplicationMessageHandlers; +using Blumchen.Subscriptions.Replication; using Npgsql; using Xunit.Abstractions; namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) +public class if_subscription_does_not_exist_and_table_is_not_empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] - public async Task Read_from_table_using_named_transaction_snapshot() + public async Task read_from_table_using_named_transaction_snapshot() { var ct = TimeoutTokenSource().Token; var sharedNamingPolicy = new AttributeNamingPolicy(); var connectionString = Container.GetConnectionString(); var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); - var resolver = new PublisherSetupOptionsBuilder() + var resolver = new OptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) - .WithTable(o => o.Name(eventsTable)) + .WithTable(o => o.Named(eventsTable)) .Build(); //subscriber ignored msg @@ -35,12 +35,12 @@ public async Task Read_from_table_using_named_transaction_snapshot() var @expected = new SubscriberUserCreated(@event.Id, @event.Name); - var ( _, subscriptionOptions) = + var subscriptionOptions = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/UnitTests/Contracts.cs b/src/UnitTests/Contracts.cs new file mode 100644 index 0000000..e295820 --- /dev/null +++ b/src/UnitTests/Contracts.cs @@ -0,0 +1,31 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; + +namespace UnitTests +{ + [MessageRoutedByUrn("user-created:v1")] + public record UserCreatedContract( + Guid Id, + string Name + ); + + [MessageRoutedByUrn("user-registered:v1")] + public record UserRegisteredContract( + Guid Id, + string Name + ); + + [RawRoutedByUrn("user-deleted:v1")] + public class MessageObjects; + + + [RawRoutedByUrn("user-modified:v1")] + internal class MessageString; + + public class InvalidMessage; + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(UserCreatedContract))] + [JsonSerializable(typeof(UserRegisteredContract))] + internal partial class SourceGenerationContext: JsonSerializerContext; +} diff --git a/src/UnitTests/UnitTests.csproj b/src/UnitTests/UnitTests.csproj new file mode 100644 index 0000000..055e70f --- /dev/null +++ b/src/UnitTests/UnitTests.csproj @@ -0,0 +1,32 @@ + + + + net8.0 + enable + enable + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + diff --git a/src/UnitTests/Usings.cs b/src/UnitTests/Usings.cs new file mode 100644 index 0000000..2af8a54 --- /dev/null +++ b/src/UnitTests/Usings.cs @@ -0,0 +1,2 @@ +global using Xunit; + diff --git a/src/UnitTests/create_publication.cs b/src/UnitTests/create_publication.cs new file mode 100644 index 0000000..59616f8 --- /dev/null +++ b/src/UnitTests/create_publication.cs @@ -0,0 +1,37 @@ +using static Blumchen.Subscriptions.Management.PublicationManagement; + +namespace UnitTests +{ + public class create_publication + { + [Fact] + public void with_no_publication_filter() + { + var publicationName = "publicationName"; + var tableName = "tableName"; + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} WITH (publish = 'insert');"; + Assert.Equal(sql, CreatePublication(publicationName, tableName, new HashSet())); + } + + [Fact] + public void with_single_publication_filter() + { + const string publicationName = "publicationName"; + const string tableName = "tableName"; + const string messageType = "messageType"; + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} WHERE (message_type = '{messageType}') WITH (publish = 'insert');"; + Assert.Equal(sql, CreatePublication(publicationName, tableName, new HashSet { messageType })); + } + + [Fact] + public void with_multiple_publication_filters() + { + const string publicationName = "publicationName"; + const string tableName = "tableName"; + const string messageType1 = "messageType1"; + const string messageType2 = "messageType2"; + var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} WHERE (message_type = '{messageType1}' OR message_type = '{messageType2}') WITH (publish = 'insert');"; + Assert.Equal(sql, CreatePublication(publicationName, tableName, new HashSet { messageType1, messageType2 })); + } + } +} diff --git a/src/UnitTests/message_table_creation.cs b/src/UnitTests/message_table_creation.cs new file mode 100644 index 0000000..378ac84 --- /dev/null +++ b/src/UnitTests/message_table_creation.cs @@ -0,0 +1,51 @@ +using Blumchen; +using FsCheck.Xunit; + +namespace UnitTests +{ + public class message_table_creation + { + [Fact] + public void default_table_descriptor() + { + const string sql = """ + CREATE TABLE IF NOT EXISTS outbox ( + id Bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + message_type Varchar(250) NOT NULL, + data Jsonb NOT NULL + ); + """; + + var implicitTableDescriptor = new TableDescriptorBuilder().Build(); + var explicitTableDescriptor = new TableDescriptorBuilder().UseDefaults().Build(); + Assert.Equal(implicitTableDescriptor.ToString(), explicitTableDescriptor.ToString()); + Assert.Equal(sql, implicitTableDescriptor.ToString()); + } + + [Property] + public void with_varying_descriptor( + string tableName, + string idColName, + string messageTypeColName, + int messageTypeColDimension, + string dataColName) + { + var sql = $""" + CREATE TABLE IF NOT EXISTS {tableName} ( + {idColName} Bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + {messageTypeColName} Varchar({messageTypeColDimension}) NOT NULL, + {dataColName} Jsonb NOT NULL + ); + """; + + var tableDescriptor = new TableDescriptorBuilder() + .Named(tableName) + .Id(idColName) + .MessageType(messageTypeColName, messageTypeColDimension) + .MessageData(dataColName) + .Build(); + + Assert.Equal(sql, tableDescriptor.ToString()); + } + } +} diff --git a/src/UnitTests/publisher_options_builder.cs b/src/UnitTests/publisher_options_builder.cs new file mode 100644 index 0000000..c80da69 --- /dev/null +++ b/src/UnitTests/publisher_options_builder.cs @@ -0,0 +1,37 @@ +using Blumchen; +using Blumchen.Publisher; +using Blumchen.Serialization; + +namespace UnitTests +{ + public class publisher_options_builder + { + + [Fact] + public void requires_a_method_call_to_JsonContext() + { + var exception = Record.Exception(() => new OptionsBuilder().Build()); + Assert.IsType(exception); + Assert.Equal("`JsonContext` method not called on OptionsBuilder", exception.Message); + } + + [Fact] + public void requires_a_method_call_to_NamingPolicy() + { + var exception = Record.Exception(() => + new OptionsBuilder().JsonContext(SourceGenerationContext.Default).Build()); + Assert.IsType(exception); + Assert.Equal("`NamingPolicy` method not called on OptionsBuilder", exception.Message); + } + + [Fact] + public void has_default_options() + { + var opts = new OptionsBuilder().JsonContext(SourceGenerationContext.Default) + .NamingPolicy(new AttributeNamingPolicy()).Build(); + + Assert.NotNull(opts.JsonTypeResolver); + Assert.Equal(new TableDescriptorBuilder().Build(), opts.TableDescriptor); + } + } +} diff --git a/src/UnitTests/subscriber_options_builder.cs b/src/UnitTests/subscriber_options_builder.cs new file mode 100644 index 0000000..540ff29 --- /dev/null +++ b/src/UnitTests/subscriber_options_builder.cs @@ -0,0 +1,174 @@ +using Blumchen; +using Blumchen.Serialization; +using Blumchen.Subscriber; +using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Replication; +using Npgsql; +using NSubstitute; +using static Blumchen.Subscriptions.Subscription; + +namespace UnitTests +{ + public class subscriber_options_builder + { + private const string ValidConnectionString = + "PORT = 5432; HOST = 127.0.0.1; TIMEOUT = 15; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; Include Error Detail=True; DATABASE = 'postgres'; PASSWORD = 'postgres'; USER ID = 'postgres';"; + private readonly Func _builder = c => new OptionsBuilder().ConnectionString(c).DataSource(new NpgsqlDataSourceBuilder(c).Build()); + + [Fact] + public void requires_at_least_one_method_call_to_connectionstring() + { + var exception = Record.Exception(() => new OptionsBuilder().Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.ConnectionString)}` method not called on {nameof(OptionsBuilder)}", exception.Message); + } + + [Fact] + public void requires_at_most_one_method_call_to_connectionstring() + { + var exception = Record.Exception(() => new OptionsBuilder() + .ConnectionString(ValidConnectionString) + .ConnectionString(ValidConnectionString) + .Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.ConnectionString)}` method on {nameof(OptionsBuilder)} called more then once", exception.Message); + } + + [Fact] + public void requires_at_least_one_method_call_to_datasource() + { + var exception = Record.Exception(() => new OptionsBuilder().ConnectionString(ValidConnectionString).Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.DataSource)}` method not called on {nameof(OptionsBuilder)}", exception.Message); + } + + [Fact] + public void requires_at_most_one_method_call_to_datasource() + { + var exception = Record.Exception(() => _builder(ValidConnectionString).DataSource(new NpgsqlDataSourceBuilder(ValidConnectionString).Build()).Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.DataSource)}` method on {nameof(OptionsBuilder)} called more then once", exception.Message); + } + + + [Fact] + public void requires_at_least_one_method_call_to_consumes() + { + var exception = Record.Exception(() => _builder(ValidConnectionString).Build()); + Assert.IsType(exception); + Assert.Equal($"No `Consumes...` method called on {nameof(OptionsBuilder)}", exception.Message); + } + + [Fact] + public void has_default_options() + { + var messageHandler = Substitute.For>(); + var opts = _builder(ValidConnectionString).ConsumesRawStrings(messageHandler).Build(); + + Assert.NotNull(opts.PublicationOptions); + Assert.Equal(CreateStyle.WhenNotExists, opts.PublicationOptions.CreateStyle); + Assert.False(opts.PublicationOptions.ShouldReAddTablesIfWereRecreated); + Assert.Empty(opts.PublicationOptions.RegisteredTypes); + Assert.Equal(opts.PublicationOptions.PublicationName, opts.PublicationOptions.PublicationName); + Assert.Equal(new TableDescriptorBuilder().Build(), opts.PublicationOptions.TableDescriptor); + + Assert.NotNull(opts.ReplicationOptions); + Assert.Equal($"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", opts.ReplicationOptions.SlotName); + Assert.Equal(CreateStyle.WhenNotExists, opts.ReplicationOptions.CreateStyle); + Assert.False(opts.ReplicationOptions.Binary); + + Assert.IsType(opts.ErrorProcessor); + } + + [Fact] + public void with_ConsumesRawStrings() + { + var messageHandler = Substitute.For>(); + var opts = _builder(ValidConnectionString).ConsumesRawStrings(messageHandler).Build(); + Assert.Equivalent(new Dictionary> { { OptionsBuilder.WildCard, new Tuple(StringReplicationDataMapper.Instance, messageHandler) } }, opts.Registry); + } + + [Fact] + public void with_ConsumesRawObjects() + { + var messageHandler = Substitute.For>(); + var opts = _builder(ValidConnectionString).ConsumesRawObjects(messageHandler).Build(); + Assert.Equivalent(new Dictionary> { { OptionsBuilder.WildCard, new Tuple(ObjectReplicationDataMapper.Instance, messageHandler) } }, opts.Registry); + } + + [Fact] + public void ConsumesRawObjects_cannot_be_mixed_with_other_consuming_strategies() + { + var messageHandler1 = Substitute.For>(); + var messageHandler2 = Substitute.For>(); + var exception = Record.Exception(() => + _builder(ValidConnectionString).ConsumesRawStrings(messageHandler2).ConsumesRawObjects(messageHandler1) + .Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.ConsumesRawObjects)}` cannot be mixed with other consuming strategies", exception.Message); + } + + [Fact] + public void ConsumesRawStrings_cannot_be_mixed_with_other_consuming_strategies() + { + var messageHandler1 = Substitute.For>(); + var messageHandler2 = Substitute.For>(); + var exception = Record.Exception(() => + _builder(ValidConnectionString).ConsumesRawObjects(messageHandler1).ConsumesRawStrings(messageHandler2) + .Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.ConsumesRawStrings)}` cannot be mixed with other consuming strategies", exception.Message); + } + + [Fact] + public void with_typed_raw_consumer_of_object_requires_RawUrn_decoration() + { + var messageHandler = Substitute.For>(); + var exception = Record.Exception(() => _builder(ValidConnectionString).ConsumesRawObject(messageHandler).Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(RawRoutedByUrnAttribute)}` missing on `{nameof(InvalidMessage)}` message type", exception.Message); + } + + [Fact] + public void with_typed_raw_consumer_of_string_requires_RawUrn_decoration() + { + var messageHandler = Substitute.For>(); + var exception = Record.Exception(() => _builder(ValidConnectionString).ConsumesRawString(messageHandler).Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(RawRoutedByUrnAttribute)}` missing on `{nameof(InvalidMessage)}` message type", exception.Message); + } + + [Fact] + public void does_not_allow_multiple_registration_of_the_same_typed_consumer() + { + var messageHandler = Substitute.For>(); + var exception = Record.Exception(() => _builder(ValidConnectionString) + .Consumes(messageHandler) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy()) + .Consumes(messageHandler) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy()) + .Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(UserCreatedContract)}` was already registered.", exception.Message); + } + + [Fact] + public void with_typed_consumer_allows_only_one_naming_policy_instance() + { + var userCreatedMessageHandler = Substitute.For>(); + var userRegisteredMessageHandler = Substitute.For>(); + var exception = Record.Exception(() => _builder(ValidConnectionString) + .Consumes(userCreatedMessageHandler) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy()) + .Consumes(userRegisteredMessageHandler) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy()) + .Build()); + Assert.IsType(exception); + Assert.Equal($"`{nameof(OptionsBuilder.NamingPolicy)}` method on OptionsBuilder called more then once", exception.Message); + } + } +}