From d2401209f513cbce50f89194d71cfddb93ff5030 Mon Sep 17 00:00:00 2001 From: MASES Public Developers Team <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 12 Jul 2023 02:17:05 +0200 Subject: [PATCH] Many updates and errors solved (#53) * Added missing file * #52: fix management of serialization * #52: fix some errors, code cleanup and added new options * #22: update to KNet 2.0.1 --- .../Internal/KafkaOptionsExtension.cs | 72 ++++++++++++---- src/KEFCore/Infrastructure/KafkaDbContext.cs | 85 +++++++++++++++++++ src/KEFCore/KEFCore.cs | 5 ++ src/KEFCore/KEFCore.csproj | 4 +- .../Serdes/Internal/IKafkaSerdesEntityType.cs | 10 ++- .../Serdes/Internal/KafkaSerdesEntityType.cs | 62 ++++++++++++-- .../Serdes/Internal/KafkaSerdesFactory.cs | 3 +- src/KEFCore/Storage/Internal/IKafkaCluster.cs | 3 +- src/KEFCore/Storage/Internal/IKafkaTable.cs | 8 +- src/KEFCore/Storage/Internal/KafkaCluster.cs | 46 ++++++---- .../Internal/KafkaStreamsBaseRetriever.cs | 12 ++- .../Internal/KafkaStreamsTableRetriever.cs | 4 +- src/KEFCore/Storage/Internal/KafkaTable.cs | 17 ++-- test/KEFCore.Test/KEFCore.Test.csproj | 2 +- test/KEFCore.Test/Program.cs | 44 ++++++---- 15 files changed, 288 insertions(+), 89 deletions(-) create mode 100644 src/KEFCore/Infrastructure/KafkaDbContext.cs diff --git a/src/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 0dd133a0..e08853fd 100644 --- a/src/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -18,6 +18,7 @@ #nullable enable +using Java.Lang; using Java.Util; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Common; @@ -46,6 +47,9 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private TopicConfigBuilder? _topicConfigBuilder; private DbContextOptionsExtensionInfo? _info; + static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader; + static Java.Lang.ClassLoader SystemClassLoader => _loader; + public KafkaOptionsExtension() { } @@ -199,18 +203,31 @@ public virtual Properties StreamsOptions(IEntityType entityType) public virtual Properties StreamsOptions(string applicationId) { - var props = new Properties(); - var localCfg = StreamsConfigBuilder.CreateFrom(StreamsConfigBuilder).WithApplicationId(applicationId) - .WithBootstrapServers(BootstrapServers) - .WithDefaultKeySerdeClass(Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass()) - .WithDefaultValueSerdeClass(Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass()); - - + Properties props = _streamsConfigBuilder ?? new(); + if (props.ContainsKey(StreamsConfig.APPLICATION_ID_CONFIG)) + { + props.Remove(StreamsConfig.APPLICATION_ID_CONFIG); + } props.Put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + if (props.ContainsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) + { + props.Remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + } props.Put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers); - props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass()); - props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass()); - + if (props.ContainsKey(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG)) + { + props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG); + } + props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$StringSerde", true, SystemClassLoader)); + if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG)) + { + props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG); + } + props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader)); + if (props.ContainsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + { + props.Remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + } props.Put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; @@ -218,13 +235,34 @@ public virtual Properties StreamsOptions(string applicationId) public virtual Properties ProducerOptions() { - Properties props = new(); + Properties props = _producerConfigBuilder ?? new(); + if (props.ContainsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + { + props.Remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + } props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers); - props.Put(ProducerConfig.ACKS_CONFIG, "all"); - props.Put(ProducerConfig.RETRIES_CONFIG, 0); - props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); - props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + if (!props.ContainsKey(ProducerConfig.ACKS_CONFIG)) + { + props.Put(ProducerConfig.ACKS_CONFIG, "all"); + } + if (!props.ContainsKey(ProducerConfig.RETRIES_CONFIG)) + { + props.Put(ProducerConfig.RETRIES_CONFIG, 0); + } + if (!props.ContainsKey(ProducerConfig.LINGER_MS_CONFIG)) + { + props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); + } + if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + { + props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); + if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + { + props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); return props; } @@ -263,7 +301,7 @@ public override string LogFragment { if (_logFragment == null) { - var builder = new StringBuilder(); + var builder = new System.Text.StringBuilder(); builder.Append("DataBaseName=").Append(Extension._databaseName).Append(' '); diff --git a/src/KEFCore/Infrastructure/KafkaDbContext.cs b/src/KEFCore/Infrastructure/KafkaDbContext.cs new file mode 100644 index 00000000..959ef782 --- /dev/null +++ b/src/KEFCore/Infrastructure/KafkaDbContext.cs @@ -0,0 +1,85 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KNet.Common; +using MASES.KNet.Producer; +using MASES.KNet.Streams; + +namespace MASES.EntityFrameworkCore.KNet.Infrastructure; + +/// +/// Allows Kafka specific configuration to be performed on . +/// +public class KafkaDbContext : DbContext +{ + /// + /// The bootstrap servers of the Apache Kafka cluster + /// + public string? BootstrapServers { get; set; } + /// + /// The application id + /// + public string ApplicationId { get; set; } = Guid.NewGuid().ToString(); + /// + /// Database name + /// + public string? DbName { get; set; } + /// + /// Database number of partitions + /// + public int DefaultNumPartitions { get; set; } = 10; + /// + /// Database replication factor + /// + public short DefaultReplicationFactor { get; set; } = 1; + /// + /// Use persistent storage + /// + public bool UsePersistentStorage { get; set; } = false; + /// + /// Use a producer for each Entity + /// + public bool UseProducerByEntity { get; set; } = false; + + public ProducerConfigBuilder? ProducerConfigBuilder { get; set; } + + public StreamsConfigBuilder? StreamsConfigBuilder { get; set; } + + public TopicConfigBuilder? TopicConfigBuilder { get; set; } + + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + if (BootstrapServers == null) + { + throw new ArgumentNullException(nameof(BootstrapServers)); + } + + if (DbName == null) throw new ArgumentNullException(nameof(DbName)); + + optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) => + { + o.StreamsConfig(StreamsConfigBuilder??o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions); + o.WithUsePersistentStorage(UsePersistentStorage); + o.WithProducerByEntity(UseProducerByEntity); + o.WithDefaultReplicationFactor(DefaultReplicationFactor); + }); + } +} diff --git a/src/KEFCore/KEFCore.cs b/src/KEFCore/KEFCore.cs index 865549a2..f743e683 100644 --- a/src/KEFCore/KEFCore.cs +++ b/src/KEFCore/KEFCore.cs @@ -22,5 +22,10 @@ namespace MASES.EntityFrameworkCore.KNet { public class KEFCore : KNetCore { +#if DEBUG + public override bool EnableDebug => true; + + public override bool LogClassPath => true; +#endif } } diff --git a/src/KEFCore/KEFCore.csproj b/src/KEFCore/KEFCore.csproj index 0ff3ccf4..80288fe1 100644 --- a/src/KEFCore/KEFCore.csproj +++ b/src/KEFCore/KEFCore.csproj @@ -63,10 +63,10 @@ - + All None - + diff --git a/src/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs b/src/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs index e29c5fbf..5fc790bf 100644 --- a/src/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs +++ b/src/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs @@ -16,17 +16,19 @@ * Refer to LICENSE for more information. */ +using Org.Apache.Kafka.Common.Header; + namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal { public interface IKafkaSerdesEntityType { - string Serialize(params object?[]? args); + string Serialize(Headers headers, params object?[]? args); - string Serialize(TKey key); + string Serialize(Headers headers, TKey key); - object[] Deserialize(string arg); + object[] Deserialize(Headers headers, string arg); - TKey Deserialize(string arg); + TKey Deserialize(Headers headers, string arg); object[] ConvertData(object[]? input); } diff --git a/src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs b/src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs index 52d0e6a7..ba5806da 100644 --- a/src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs +++ b/src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs @@ -16,8 +16,14 @@ * Refer to LICENSE for more information. */ +using Org.Apache.Kafka.Common.Header; +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; + namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal { + [JsonSerializable(typeof(KafkaSerdesEntityTypeData))] public class KafkaSerdesEntityTypeData { public KafkaSerdesEntityTypeData() { } @@ -27,9 +33,10 @@ public KafkaSerdesEntityTypeData(string tName, object[] rData) typeName = tName; data = rData; } - - public string? typeName; - public object[]? data; + [JsonInclude()] + public string typeName; + [JsonInclude()] + public object[] data; } public class KafkaSerdesEntityType : IKafkaSerdesEntityType @@ -43,27 +50,64 @@ public KafkaSerdesEntityType(IEntityType type) _properties = _type.GetProperties().ToArray(); } - public object[] Deserialize(string arg) + public object[] Deserialize(Headers headers, string arg) { var des = GetFullType(arg); return ConvertData(des!.data); } - public TKey Deserialize(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; + public TKey Deserialize(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; - public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); + public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); - public string Serialize(TKey key) => System.Text.Json.JsonSerializer.Serialize(key); + public string Serialize(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key); public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg); public object[] ConvertData(object[]? input) { + if (input == null) return null; + List data = new List(); + for (int i = 0; i < input!.Length; i++) { - input[i] = Convert.ChangeType(input[i], _properties[i].ClrType); + if (input[i] is JsonElement elem) + { + switch (elem.ValueKind) + { + case JsonValueKind.Undefined: + break; + case JsonValueKind.Object: + break; + case JsonValueKind.Array: + break; + case JsonValueKind.String: + data.Add(elem.GetString()); + break; + case JsonValueKind.Number: + var tmp = elem.GetInt64(); + data.Add(Convert.ChangeType(tmp, _properties[i].ClrType)); + break; + case JsonValueKind.True: + data.Add(true); + break; + case JsonValueKind.False: + data.Add(false); + break; + case JsonValueKind.Null: + data.Add(null); + break; + default: + break; + } + + } + else + { + data.Add(Convert.ChangeType(input[i], _properties[i].ClrType)); + } } - return input; + return data.ToArray(); } } } diff --git a/src/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs b/src/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs index b331885a..6ad829d9 100644 --- a/src/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs +++ b/src/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs @@ -43,8 +43,7 @@ public virtual IKafkaSerdesEntityType Get(string typeName) public virtual object[] Deserialize(byte[] data) { var str = Encoding.UTF8.GetString(data); - var fulltype = KafkaSerdesEntityType.GetFullType(str); - return Get(fulltype!.typeName!).ConvertData(fulltype.data); + return Deserialize(str); } public virtual object[] Deserialize(string data) diff --git a/src/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/KEFCore/Storage/Internal/IKafkaCluster.cs index c0c0fade..5cdaa59a 100644 --- a/src/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -20,6 +20,7 @@ using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using MASES.KNet.Producer; +using Org.Apache.Kafka.Clients.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -45,7 +46,7 @@ bool EnsureConnected( IKafkaSerdesEntityType CreateSerdes(IEntityType entityType); - IKNetProducer CreateProducer(IEntityType entityType); + IProducer CreateProducer(IEntityType entityType); IEnumerable GetData(IEntityType entityType); diff --git a/src/KEFCore/Storage/Internal/IKafkaTable.cs b/src/KEFCore/Storage/Internal/IKafkaTable.cs index edb33bfb..78e97717 100644 --- a/src/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/KEFCore/Storage/Internal/IKafkaTable.cs @@ -33,13 +33,13 @@ public interface IKafkaTable IEnumerable Rows { get; } - KNetProducerRecord Create(IUpdateEntry entry); + ProducerRecord Create(IUpdateEntry entry); - KNetProducerRecord Delete(IUpdateEntry entry); + ProducerRecord Delete(IUpdateEntry entry); - KNetProducerRecord Update(IUpdateEntry entry); + ProducerRecord Update(IUpdateEntry entry); - IEnumerable> Commit(IEnumerable> records); + IEnumerable> Commit(IEnumerable> records); KafkaIntegerValueGenerator GetIntegerValueGenerator( IProperty property, diff --git a/src/KEFCore/Storage/Internal/KafkaCluster.cs b/src/KEFCore/Storage/Internal/KafkaCluster.cs index 0539f4ee..65528daf 100644 --- a/src/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/KEFCore/Storage/Internal/KafkaCluster.cs @@ -30,6 +30,8 @@ using Org.Apache.Kafka.Common.Config; using Org.Apache.Kafka.Clients.Producer; using Org.Apache.Kafka.Common.Errors; +using MASES.KNet.Serialization; +using MASES.KNet.Extensions; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -45,8 +47,8 @@ public class KafkaCluster : IKafkaCluster private System.Collections.Generic.Dictionary? _tables; - private IKNetProducer? _globalProducer = null; - private readonly ConcurrentDictionary> _producers; + private IProducer? _globalProducer = null; + private readonly ConcurrentDictionary> _producers; public KafkaCluster( KafkaOptionsExtension options, @@ -160,24 +162,36 @@ public virtual bool CreateTable(IEntityType entityType) { try { - var topic = new NewTopic(entityType.TopicName(_options), entityType.NumPartitions(_options), entityType.ReplicationFactor(_options)); - var map = Collections.SingletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); - topic.Configs(map); - var coll = Collections.Singleton(topic); - var result = _kafkaAdminClient.CreateTopics(coll); - result.All().Get(); + try + { + var topic = new NewTopic(entityType.TopicName(_options), entityType.NumPartitions(_options), entityType.ReplicationFactor(_options)); + _options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete; + var map = _options.TopicConfigBuilder.ToMap(); + topic.Configs(map); + var coll = Collections.Singleton(topic); + var result = _kafkaAdminClient.CreateTopics(coll); + result.All().Get(); + } + catch (Java.Util.Concurrent.ExecutionException ex) + { + throw ex.InnerException; + } } - catch (Java.Util.Concurrent.ExecutionException ex) + catch (TopicExistsException ex) { + if (ex.Message.Contains("deletion")) + { + Thread.Sleep(1000); // wait a while to complete topic deletion + return CreateTable(entityType); + } return false; } - return true; } public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) => _serdesFactory.GetOrCreate(entityType); - public virtual IKNetProducer CreateProducer(IEntityType entityType) + public virtual IProducer CreateProducer(IEntityType entityType) { if (!Options.ProducerByEntity) { @@ -193,7 +207,7 @@ public virtual IKNetProducer CreateProducer(IEntityType entityTy } } - private IKNetProducer CreateProducer() => new KNetProducer(Options.ProducerOptions()); + private IProducer CreateProducer() => new KafkaProducer(Options.ProducerOptions()); private static System.Collections.Generic.Dictionary CreateTables() => new(); @@ -226,7 +240,7 @@ public virtual int ExecuteTransaction( IDiagnosticsLogger updateLogger) { var rowsAffected = 0; - System.Collections.Generic.Dictionary>> _dataInTransaction = new(); + System.Collections.Generic.Dictionary>> _dataInTransaction = new(); lock (_lock) { @@ -240,7 +254,7 @@ public virtual int ExecuteTransaction( var table = EnsureTable(entityType); - KNetProducerRecord record; + ProducerRecord record; if (entry.SharedIdentityEntry != null) { @@ -267,9 +281,9 @@ record = table.Update(entry); continue; } - if (!_dataInTransaction.TryGetValue(table, out System.Collections.Generic.IList>? recordList)) + if (!_dataInTransaction.TryGetValue(table, out System.Collections.Generic.IList>? recordList)) { - recordList = new System.Collections.Generic.List>(); + recordList = new System.Collections.Generic.List>(); _dataInTransaction[table] = recordList; } recordList?.Add(record); diff --git a/src/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index 61aeec5a..462cedb2 100644 --- a/src/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -18,6 +18,7 @@ #nullable enable +using MASES.JCOBridge.C2JBridge; using Org.Apache.Kafka.Common.Utils; using Org.Apache.Kafka.Streams; using Org.Apache.Kafka.Streams.Errors; @@ -133,10 +134,7 @@ private void StartTopology(StreamsBuilder builder, KStream root) resetEvent.WaitOne(); // wait running state if (resultException != null) throw resultException; - if (keyValueStore == null) - { - keyValueStore = streams?.Store(StoreQueryParameters>.FromNameAndType(_storageId, QueryableStoreTypes.KeyValueStore())); - } + keyValueStore ??= streams?.Store(StoreQueryParameters>.FromNameAndType(_storageId, QueryableStoreTypes.KeyValueStore())); } public IEnumerator GetEnumerator() @@ -175,7 +173,7 @@ public KafkaEnumerator(IKafkaCluster kafkaCluster, ReadOnlyKeyValueStore? _keyValueStore = keyValueStore; Trace.WriteLine($"KafkaEnumerator - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}"); keyValueIterator = _keyValueStore?.All(); - keyValueEnumerator = keyValueIterator?.GetEnumerator(); + keyValueEnumerator = keyValueIterator?.ToIEnumerator(); } public ValueBuffer Current @@ -186,7 +184,7 @@ public ValueBuffer Current { var kv = keyValueEnumerator.Current; object? v = kv.value; - var data = _kafkaCluster.SerdesFactory.Deserialize(v as string); + var data = _kafkaCluster.SerdesFactory.Deserialize(v as byte[]); return new ValueBuffer(data); } throw new InvalidOperationException("InvalidEnumerator"); @@ -210,7 +208,7 @@ public void Reset() { keyValueIterator?.Dispose(); keyValueIterator = _keyValueStore?.All(); - keyValueEnumerator = keyValueIterator?.GetEnumerator(); + keyValueEnumerator = keyValueIterator?.ToIEnumerator(); } } } diff --git a/src/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs index 5537f469..3776a884 100644 --- a/src/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs +++ b/src/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs @@ -22,7 +22,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal { - public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever + public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever { public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType) : this(kafkaCluster, entityType, new StreamsBuilder()) @@ -30,7 +30,7 @@ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entity } public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, StreamsBuilder builder) - : base(kafkaCluster, entityType, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) + : base(kafkaCluster, entityType, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) { } } diff --git a/src/KEFCore/Storage/Internal/KafkaTable.cs b/src/KEFCore/Storage/Internal/KafkaTable.cs index 7ddf7670..d91ef575 100644 --- a/src/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/KEFCore/Storage/Internal/KafkaTable.cs @@ -26,6 +26,8 @@ using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.KNet.Producer; using Org.Apache.Kafka.Clients.Producer; +using Org.Apache.Kafka.Common.Header; +using Org.Apache.Kafka.Connect.Transforms; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -40,7 +42,7 @@ public class KafkaTable : IKafkaTable private Dictionary? _integerGenerators; - private readonly IKNetProducer _kafkaProducer; + private readonly IProducer _kafkaProducer; private readonly string _tableAssociatedTopicName; private readonly IKafkaSerdesEntityType _serdes; private readonly KafkaStreamsTableRetriever _streamData; @@ -157,7 +159,7 @@ public virtual IEnumerable Rows private static System.Collections.Generic.List GetKeyComparers(IEnumerable properties) => properties.Select(p => p.GetKeyValueComparer()).ToList(); - public virtual KNetProducerRecord Create(IUpdateEntry entry) + public virtual ProducerRecord Create(IUpdateEntry entry) { var properties = entry.EntityType.GetProperties().ToList(); var row = new object?[properties.Count]; @@ -184,7 +186,7 @@ public virtual KNetProducerRecord Create(IUpdateEntry entry) return NewRecord(entry, key, row); } - public virtual KNetProducerRecord Delete(IUpdateEntry entry) + public virtual ProducerRecord Delete(IUpdateEntry entry) { var key = CreateKey(entry); @@ -244,7 +246,7 @@ private static bool IsConcurrencyConflict( return false; } - public virtual KNetProducerRecord Update(IUpdateEntry entry) + public virtual ProducerRecord Update(IUpdateEntry entry) { var key = CreateKey(entry); @@ -295,7 +297,7 @@ public virtual KNetProducerRecord Update(IUpdateEntry entry) } } - public virtual IEnumerable> Commit(IEnumerable> records) + public virtual IEnumerable> Commit(IEnumerable> records) { System.Collections.Generic.List> futures = new(); foreach (var record in records) @@ -320,9 +322,10 @@ public virtual void BumpValueGenerators(object?[] row) } } - private KNetProducerRecord NewRecord(IUpdateEntry entry, TKey key, object?[]? row) + private ProducerRecord NewRecord(IUpdateEntry entry, TKey key, object?[]? row) { - var record = new KNetProducerRecord(_tableAssociatedTopicName, _serdes.Serialize(key), _serdes.Serialize(row)); + Headers headers = Headers.Create(); + var record = new ProducerRecord(_tableAssociatedTopicName, 0, new System.DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(), _serdes.Serialize(headers, key), _serdes.Serialize(headers, row), headers); return record; } diff --git a/test/KEFCore.Test/KEFCore.Test.csproj b/test/KEFCore.Test/KEFCore.Test.csproj index 589bb68a..eefaacbf 100644 --- a/test/KEFCore.Test/KEFCore.Test.csproj +++ b/test/KEFCore.Test/KEFCore.Test.csproj @@ -11,6 +11,6 @@ - + diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs index 8a7117be..ba9fc215 100644 --- a/test/KEFCore.Test/Program.cs +++ b/test/KEFCore.Test/Program.cs @@ -22,6 +22,8 @@ * SOFTWARE. */ +using MASES.EntityFrameworkCore.KNet.Infrastructure; +using MASES.KNet.Streams; using Microsoft.EntityFrameworkCore; using System.Collections.Generic; using System.Diagnostics; @@ -44,7 +46,16 @@ static void Main(string[] args) serverToUse = args[0]; } - using (var context = new BloggingContext(serverToUse)) + var streamConfig = StreamsConfigBuilder.Create(); + streamConfig = streamConfig.WithAcceptableRecoveryLag(100); + + using (var context = new BloggingContext() + { + BootstrapServers = serverToUse, + ApplicationId = "TestApplication", + DbName = "TestDB", + StreamsConfigBuilder = streamConfig, + }) { context.Database.EnsureDeleted(); context.Database.EnsureCreated(); @@ -66,10 +77,15 @@ static void Main(string[] args) }); } context.SaveChanges(); - } - using (var context = new BloggingContext(serverToUse)) + using (var context = new BloggingContext() + { + BootstrapServers = serverToUse, + ApplicationId = "TestApplication", + DbName = "TestDB", + StreamsConfigBuilder = streamConfig, + }) { //var pageObject = (from op in context.Blogs @@ -102,24 +118,18 @@ static void Main(string[] args) } } - public class BloggingContext : DbContext + public class BloggingContext : KafkaDbContext { - readonly string _serverToUse; - public BloggingContext(string serverToUse) - { - _serverToUse = serverToUse; - } - public DbSet Blogs { get; set; } public DbSet Posts { get; set; } - protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) - { - optionsBuilder.UseKafkaDatabase("TestApplication", "TestDB", _serverToUse, (o) => - { - o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10); - }); - } + //protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + //{ + // optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) => + // { + // o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10); + // }); + //} //protected override void OnModelCreating(ModelBuilder modelBuilder) //{