Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added KNetKeySerDes and KNetValueSerDes properties to simplify the usage of external serializers #340

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ public KNetConsumer(Properties props, bool useJVMCallback = false)
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ConsumerConfigBuilder"/> </param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(ConsumerConfigBuilder configBuilder, bool useJVMCallback = false)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
Expand Down
103 changes: 101 additions & 2 deletions src/net/KNet/Specific/GenericConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using Java.Util;
using System.Globalization;
using System;
using MASES.KNet.Serialization;
using System.Linq;

namespace MASES.KNet
{
Expand All @@ -44,7 +46,9 @@ public static T CreateFrom(T origin)
if (origin == null) return Create();
var newT = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(origin._options)
_options = new System.Collections.Generic.Dictionary<string, object>(origin._options),
_KNetKeySerDes = origin._KNetKeySerDes,
_KNetValueSerDes = origin._KNetValueSerDes,
};
return newT;
}
Expand Down Expand Up @@ -106,7 +110,8 @@ protected virtual T Clone()
var clone = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(_options)
}; return clone;
};
return clone;
}
/// <summary>
/// Returns the <see cref="Properties"/> from the <typeparamref name="T"/> instance
Expand Down Expand Up @@ -137,5 +142,99 @@ public Map<string, string> ToMap()

return props;
}
Type _KNetKeySerDes = null;
/// <summary>
/// The <see cref="Type"/> used to create an instance of <see cref="IKNetSerDes{T}"/> for keys with <see cref="BuildKeySerDes{TKey}"/>
/// </summary>
public Type KNetKeySerDes
{
get { return _KNetKeySerDes; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IKNetSerDes<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
}
_KNetKeySerDes = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid ValueContainer type");
}
}

Type _KNetValueSerDes = null;
/// <summary>
/// The <see cref="Type"/> used to create an instance of <see cref="IKNetSerDes{T}"/> for values with <see cref="BuildValueSerDes{TValue}"/>
/// </summary>
public Type KNetValueSerDes
{
get { return _KNetValueSerDes; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IKNetSerDes<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
}
_KNetValueSerDes = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid Serializer type");
}
}

/// <summary>
/// Builds an instance of <see cref="IKNetSerDes{TKey}"/> using the <see cref="Type"/> defined in <see cref="KNetKeySerDes"/>
/// </summary>
/// <typeparam name="TKey">The type of the key</typeparam>
/// <returns>An instance of <see cref="IKNetSerDes{TKey}"/></returns>
/// <exception cref="InvalidOperationException">If <see cref="KNetKeySerDes"/> is <see langword="null"/></exception>
public IKNetSerDes<TKey> BuildKeySerDes<TKey>()
{
if (KNetSerialization.IsInternalManaged<TKey>())
{
return new KNetSerDes<TKey>();
}

if (KNetKeySerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TKey)}, property {nameof(KNetKeySerDes)} shall be set.");
var tmp = KNetKeySerDes.MakeGenericType(typeof(TKey));
var o = Activator.CreateInstance(tmp);
return o as IKNetSerDes<TKey>;
}
/// <summary>
/// Builds an instance of <see cref="IKNetSerDes{TValue}"/> using the <see cref="Type"/> defined in <see cref="KNetValueSerDes"/>
/// </summary>
/// <typeparam name="TValue">The type of the key</typeparam>
/// <returns>An instance of <see cref="IKNetSerDes{TValue}"/></returns>
/// <exception cref="InvalidOperationException">If <see cref="KNetValueSerDes"/> is <see langword="null"/></exception>
public IKNetSerDes<TValue> BuildValueSerDes<TValue>()
{
if (KNetSerialization.IsInternalManaged<TValue>())
{
return new KNetSerDes<TValue>();
}

if (KNetValueSerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TValue)}, property {nameof(KNetValueSerDes)} shall be set.");
var tmp = KNetValueSerDes.MakeGenericType(typeof(TValue));
var o = Activator.CreateInstance(tmp);
return o as IKNetSerDes<TValue>;
}
}
}
15 changes: 12 additions & 3 deletions src/net/KNet/Specific/Producer/KNetProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class KNetProducer<K, V> : KafkaProducer<byte[], byte[]>, IKNetProducer<K
/// </summary>
public override string BridgeClassName => "org.mases.knet.clients.producer.KNetProducer";

readonly bool autoCreateSerDes = false;
readonly bool _autoCreateSerDes = false;
readonly IKNetSerializer<K> _keySerializer;
readonly IKNetSerializer<V> _valueSerializer;
/// <summary>
Expand All @@ -175,7 +175,16 @@ public class KNetProducer<K, V> : KafkaProducer<byte[], byte[]>, IKNetProducer<K
public KNetProducer(Properties props)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>())
{
autoCreateSerDes = true;
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetProducer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ProducerConfigBuilder"/> </param>
public KNetProducer(ProducerConfigBuilder configBuilder)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetProducer{K, V}"/>
Expand Down Expand Up @@ -211,7 +220,7 @@ static Properties CheckProperties(Properties props)
/// </summary>
~KNetProducer()
{
if (autoCreateSerDes)
if (_autoCreateSerDes)
{
_keySerializer?.Dispose();
_valueSerializer?.Dispose();
Expand Down
Loading