Skip to content

Commit

Permalink
Many updates and errors solved (#53)
Browse files Browse the repository at this point in the history
* Added missing file

* #52: fix management of serialization

* #52: fix some errors, code cleanup and added new options

* #22: update to KNet 2.0.1
  • Loading branch information
masesdevelopers committed Jul 12, 2023
1 parent 6ed45d1 commit d240120
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 89 deletions.
72 changes: 55 additions & 17 deletions src/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#nullable enable

using Java.Lang;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Common;
Expand Down Expand Up @@ -46,6 +47,9 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private TopicConfigBuilder? _topicConfigBuilder;
private DbContextOptionsExtensionInfo? _info;

static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
static Java.Lang.ClassLoader SystemClassLoader => _loader;

public KafkaOptionsExtension()
{
}
Expand Down Expand Up @@ -199,32 +203,66 @@ public virtual Properties StreamsOptions(IEntityType entityType)

public virtual Properties StreamsOptions(string applicationId)
{
var props = new Properties();
var localCfg = StreamsConfigBuilder.CreateFrom(StreamsConfigBuilder).WithApplicationId(applicationId)
.WithBootstrapServers(BootstrapServers)
.WithDefaultKeySerdeClass(Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass())
.WithDefaultValueSerdeClass(Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass());


Properties props = _streamsConfigBuilder ?? new();
if (props.ContainsKey(StreamsConfig.APPLICATION_ID_CONFIG))
{
props.Remove(StreamsConfig.APPLICATION_ID_CONFIG);
}
props.Put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
if (props.ContainsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
{
props.Remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}
props.Put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass());
props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Org.Apache.Kafka.Common.Serialization.Serdes.String().Dyn().getClass());

if (props.ContainsKey(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG))
{
props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
}
props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$StringSerde", true, SystemClassLoader));
if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
{
props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
}
props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
{
props.Remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
}
props.Put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return props;
}

public virtual Properties ProducerOptions()
{
Properties props = new();
Properties props = _producerConfigBuilder ?? new();
if (props.ContainsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
{
props.Remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
if (!props.ContainsKey(ProducerConfig.ACKS_CONFIG))
{
props.Put(ProducerConfig.ACKS_CONFIG, "all");
}
if (!props.ContainsKey(ProducerConfig.RETRIES_CONFIG))
{
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
}
if (!props.ContainsKey(ProducerConfig.LINGER_MS_CONFIG))
{
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
}
if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
{
props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));
if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
{
props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));

return props;
}
Expand Down Expand Up @@ -263,7 +301,7 @@ public override string LogFragment
{
if (_logFragment == null)
{
var builder = new StringBuilder();
var builder = new System.Text.StringBuilder();

builder.Append("DataBaseName=").Append(Extension._databaseName).Append(' ');

Expand Down
85 changes: 85 additions & 0 deletions src/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;

namespace MASES.EntityFrameworkCore.KNet.Infrastructure;

/// <summary>
/// Allows Kafka specific configuration to be performed on <see cref="DbContext" />.
/// </summary>
public class KafkaDbContext : DbContext
{
/// <summary>
/// The bootstrap servers of the Apache Kafka cluster
/// </summary>
public string? BootstrapServers { get; set; }
/// <summary>
/// The application id
/// </summary>
public string ApplicationId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// Database name
/// </summary>
public string? DbName { get; set; }
/// <summary>
/// Database number of partitions
/// </summary>
public int DefaultNumPartitions { get; set; } = 10;
/// <summary>
/// Database replication factor
/// </summary>
public short DefaultReplicationFactor { get; set; } = 1;
/// <summary>
/// Use persistent storage
/// </summary>
public bool UsePersistentStorage { get; set; } = false;
/// <summary>
/// Use a producer for each Entity
/// </summary>
public bool UseProducerByEntity { get; set; } = false;

public ProducerConfigBuilder? ProducerConfigBuilder { get; set; }

public StreamsConfigBuilder? StreamsConfigBuilder { get; set; }

public TopicConfigBuilder? TopicConfigBuilder { get; set; }

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
if (BootstrapServers == null)
{
throw new ArgumentNullException(nameof(BootstrapServers));
}

if (DbName == null) throw new ArgumentNullException(nameof(DbName));

optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.StreamsConfig(StreamsConfigBuilder??o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithProducerByEntity(UseProducerByEntity);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
});
}
}
5 changes: 5 additions & 0 deletions src/KEFCore/KEFCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ namespace MASES.EntityFrameworkCore.KNet
{
public class KEFCore : KNetCore<KEFCore>
{
#if DEBUG
public override bool EnableDebug => true;

public override bool LogClassPath => true;
#endif
}
}
4 changes: 2 additions & 2 deletions src/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.0.0">
<PackageReference Include="MASES.KNet" Version="2.0.1">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.8" PrivateAssets="none" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.9" PrivateAssets="none" />
</ItemGroup>
</Project>
10 changes: 6 additions & 4 deletions src/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
* Refer to LICENSE for more information.
*/

using Org.Apache.Kafka.Common.Header;

namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal
{
public interface IKafkaSerdesEntityType
{
string Serialize(params object?[]? args);
string Serialize(Headers headers, params object?[]? args);

string Serialize<TKey>(TKey key);
string Serialize<TKey>(Headers headers, TKey key);

object[] Deserialize(string arg);
object[] Deserialize(Headers headers, string arg);

TKey Deserialize<TKey>(string arg);
TKey Deserialize<TKey>(Headers headers, string arg);

object[] ConvertData(object[]? input);
}
Expand Down
62 changes: 53 additions & 9 deletions src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
* Refer to LICENSE for more information.
*/

using Org.Apache.Kafka.Common.Header;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;

namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal
{
[JsonSerializable(typeof(KafkaSerdesEntityTypeData))]
public class KafkaSerdesEntityTypeData
{
public KafkaSerdesEntityTypeData() { }

Check warning on line 29 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field 'typeName' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 29 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field 'data' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
Expand All @@ -27,9 +33,10 @@ public KafkaSerdesEntityTypeData(string tName, object[] rData)
typeName = tName;
data = rData;
}

public string? typeName;
public object[]? data;
[JsonInclude()]
public string typeName;
[JsonInclude()]
public object[] data;
}

public class KafkaSerdesEntityType : IKafkaSerdesEntityType
Expand All @@ -43,27 +50,64 @@ public KafkaSerdesEntityType(IEntityType type)
_properties = _type.GetProperties().ToArray();
}

public object[] Deserialize(string arg)
public object[] Deserialize(Headers headers, string arg)
{
var des = GetFullType(arg);
return ConvertData(des!.data);
}

public TKey Deserialize<TKey>(string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;
public TKey Deserialize<TKey>(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;

public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));
public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));

public string Serialize<TKey>(TKey key) => System.Text.Json.JsonSerializer.Serialize(key);
public string Serialize<TKey>(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key);

public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize<KafkaSerdesEntityTypeData>(arg);

public object[] ConvertData(object[]? input)
{
if (input == null) return null;

Check warning on line 69 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.

Check warning on line 69 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.
List<object> data = new List<object>();

for (int i = 0; i < input!.Length; i++)
{
input[i] = Convert.ChangeType(input[i], _properties[i].ClrType);
if (input[i] is JsonElement elem)
{
switch (elem.ValueKind)
{
case JsonValueKind.Undefined:
break;
case JsonValueKind.Object:
break;
case JsonValueKind.Array:
break;
case JsonValueKind.String:
data.Add(elem.GetString());

Check warning on line 85 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'item' in 'void List<object>.Add(object item)'.

Check warning on line 85 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'item' in 'void List<object>.Add(object item)'.
break;
case JsonValueKind.Number:
var tmp = elem.GetInt64();
data.Add(Convert.ChangeType(tmp, _properties[i].ClrType));
break;
case JsonValueKind.True:
data.Add(true);
break;
case JsonValueKind.False:
data.Add(false);
break;
case JsonValueKind.Null:
data.Add(null);

Check warning on line 98 in src/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Cannot convert null literal to non-nullable reference type.
break;
default:
break;
}

}
else
{
data.Add(Convert.ChangeType(input[i], _properties[i].ClrType));
}
}
return input;
return data.ToArray();
}
}
}
3 changes: 1 addition & 2 deletions src/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public virtual IKafkaSerdesEntityType Get(string typeName)
public virtual object[] Deserialize(byte[] data)
{
var str = Encoding.UTF8.GetString(data);
var fulltype = KafkaSerdesEntityType.GetFullType(str);
return Get(fulltype!.typeName!).ConvertData(fulltype.data);
return Deserialize(str);
}

public virtual object[] Deserialize(string data)
Expand Down
3 changes: 2 additions & 1 deletion src/KEFCore/Storage/Internal/IKafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using MASES.KNet.Producer;
using Org.Apache.Kafka.Clients.Producer;

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

Expand All @@ -45,7 +46,7 @@ bool EnsureConnected(

IKafkaSerdesEntityType CreateSerdes(IEntityType entityType);

IKNetProducer<string, string> CreateProducer(IEntityType entityType);
IProducer<string, string> CreateProducer(IEntityType entityType);

IEnumerable<ValueBuffer> GetData(IEntityType entityType);

Expand Down
8 changes: 4 additions & 4 deletions src/KEFCore/Storage/Internal/IKafkaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public interface IKafkaTable

IEnumerable<object?[]> Rows { get; }

KNetProducerRecord<string, string> Create(IUpdateEntry entry);
ProducerRecord<string, string> Create(IUpdateEntry entry);

KNetProducerRecord<string, string> Delete(IUpdateEntry entry);
ProducerRecord<string, string> Delete(IUpdateEntry entry);

KNetProducerRecord<string, string> Update(IUpdateEntry entry);
ProducerRecord<string, string> Update(IUpdateEntry entry);

IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<KNetProducerRecord<string, string>> records);
IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<ProducerRecord<string, string>> records);

KafkaIntegerValueGenerator<TProperty> GetIntegerValueGenerator<TProperty>(
IProperty property,
Expand Down
Loading

0 comments on commit d240120

Please sign in to comment.