Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates on serializers retrieve #453

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@
*/

using MASES.JCOBridge.C2JBridge;
using MASES.JCOBridge.C2JBridge.JVMInterop;

namespace Org.Apache.Kafka.Common.Serialization
{
/// <summary>
/// <summary>
/// Listener for Kafka Serializer. Extends <see cref="JVMBridgeListener"/>. Implements <see cref="ISerializer{T}"/>
/// </summary>
/// <remarks>Dispose the object to avoid a resource leak, the object contains a reference to the corresponding JVM object</remarks>
public partial class Serde<T> : ISerde<T>
public partial class SerdeDirect<T> : ISerde<T>
{
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/3.6.2/org/apache/kafka/common/serialization/Serde.html#deserializer--"/>
/// </summary>
/// <returns><see cref="Org.Apache.Kafka.Common.Serialization.Deserializer"/></returns>
public Org.Apache.Kafka.Common.Serialization.Deserializer<T> DeserializerDirect()
masesdevelopers marked this conversation as resolved.
Show resolved Hide resolved
{
return new DeserializerDirect<T>();
var res = this.IExecute("deserializer") as IJavaObject;
return WrapsDirect<DeserializerDirect<T>>(res);
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/3.6.2/org/apache/kafka/common/serialization/Serde.html#serializer--"/>
/// </summary>
/// <returns><see cref="Org.Apache.Kafka.Common.Serialization.Serializer"/></returns>
public Org.Apache.Kafka.Common.Serialization.Serializer<T> SerializerDirect()
masesdevelopers marked this conversation as resolved.
Show resolved Hide resolved
{
return new SerializerDirect<T>();
var res = this.IExecute("serializer") as IJavaObject;
return WrapsDirect<SerializerDirect<T>>(res);
}
}
}
58 changes: 19 additions & 39 deletions src/net/KNet/Specific/Serialization/SerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,81 +114,61 @@ public SerDes()
throw new InvalidOperationException($"{_SerializationType} is not valid.");
}

SerdeDirect<TJVMT> kSerde = null;

if (IsDirectBuffered)
{
_KafkaSerializer = new KNetByteBufferSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new KNetByteBufferDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new KNetSerdes.ByteBufferSerde().CastDirect<Serde<TJVMT>>();
kSerde = new KNetSerdes.ByteBufferSerde().CastTo<SerdeDirect<TJVMT>>();
}
else
{
switch (_JVMSerializationType)
{
case KNetSerialization.SerializationType.Boolean:
_KafkaSerializer = new BooleanSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new BooleanDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.BooleanSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.BooleanSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.ByteArray:
_KafkaSerializer = new ByteArraySerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ByteArrayDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ByteArraySerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ByteArraySerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.ByteBuffer:
_KafkaSerializer = new ByteBufferSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ByteBufferDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ByteBufferSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ByteBufferSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Bytes:
_KafkaSerializer = new BytesSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new BytesDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.BytesSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.BytesSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Double:
_KafkaSerializer = new DoubleSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new DoubleDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.DoubleSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.DoubleSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Float:
_KafkaSerializer = new FloatSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new FloatDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.FloatSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.FloatSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Integer:
_KafkaSerializer = new IntegerSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new IntegerDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.IntegerSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.IntegerSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Long:
_KafkaSerializer = new LongSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new LongDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.LongSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.LongSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Short:
_KafkaSerializer = new ShortSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ShortDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ShortSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ShortSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.String:
_KafkaSerializer = new StringSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new StringDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.StringSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.StringSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Guid:
_KafkaSerializer = new UUIDSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new UUIDDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.UUIDSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.UUIDSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Void:
_KafkaSerializer = new VoidSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new VoidDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.VoidSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.VoidSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.External:
default:
throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}.");
}
}

_KafkaSerde = kSerde;
_KafkaSerializer = kSerde.SerializerDirect();
_KafkaDeserializer = kSerde.DeserializerDirect();
}
/// <summary>
/// Finalizer
Expand Down