Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessagePack codec #8546

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
<PackageVersion Include="Microsoft.Extensions.Configuration.AzureKeyVault" Version="3.1.24" />
<PackageVersion Include="System.CommandLine" Version="2.0.0-beta1.21308.1" />
<PackageVersion Include="Microsoft.Crank.EventSources" Version="0.2.0-alpha.23422.5" />
<PackageVersion Include="MessagePack" Version="2.5.124" />
<PackageVersion Include="MessagePack" Version="2.5.168" />
<PackageVersion Include="ZeroFormatter" Version="1.6.4" />
<PackageVersion Include="Utf8Json" Version="1.3.7" />
<PackageVersion Include="SpanJson" Version="4.0.1" />
Expand Down
7 changes: 7 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DashboardToy.AppHost", "pla
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Orleans.Serialization.FSharp.Tests", "test\Orleans.Serialization.FSharp.Tests\Orleans.Serialization.FSharp.Tests.fsproj", "{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.MessagePack", "src\Orleans.Serialization.MessagePack\Orleans.Serialization.MessagePack.csproj", "{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -622,6 +624,10 @@ Global
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Release|Any CPU.Build.0 = Release|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -737,6 +743,7 @@ Global
{C4DD4F96-3EC6-47C6-97AA-9B14F0F2099B} = {316CDCC7-323F-4264-9FC9-667662BB1F80}
{84B44F1D-B7FE-40E3-82F0-730A55AC8613} = {316CDCC7-323F-4264-9FC9-667662BB1F80}
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
256 changes: 256 additions & 0 deletions src/Orleans.Serialization.MessagePack/MessagePackCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using MessagePack;
using Microsoft.Extensions.Options;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Buffers.Adaptors;
using Orleans.Serialization.Cloning;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Serializers;
using Orleans.Serialization.WireProtocol;

namespace Orleans.Serialization;

/// <summary>
/// A serialization codec which uses <see cref="MessagePackSerializer"/>.
/// </summary>
/// <remarks>
/// MessagePack codec performs slightly worse than default Orleans serializer, if performance is critical for your application, consider using default serialization.
/// </remarks>
[Alias(WellKnownAlias)]
public class MessagePackCodec : IGeneralizedCodec, IGeneralizedCopier, ITypeFilter
{
private static readonly ConcurrentDictionary<Type, bool> SupportedTypes = new();

private static readonly Type SelfType = typeof(MessagePackCodec);

private readonly ICodecSelector[] _serializableTypeSelectors;
private readonly ICopierSelector[] _copyableTypeSelectors;
private readonly MessagePackCodecOptions _options;

/// <summary>
/// The well-known type alias for this codec.
/// </summary>
public const string WellKnownAlias = "msgpack";

/// <summary>
/// Initializes a new instance of the <see cref="MessagePackCodec"/> class.
/// </summary>
/// /// <param name="serializableTypeSelectors">Filters used to indicate which types should be serialized by this codec.</param>
/// <param name="copyableTypeSelectors">Filters used to indicate which types should be copied by this codec.</param>
/// <param name="options">The MessagePack codec options.</param>
public MessagePackCodec(
IEnumerable<ICodecSelector> serializableTypeSelectors,
IEnumerable<ICopierSelector> copyableTypeSelectors,
IOptions<MessagePackCodecOptions> options)
{
_serializableTypeSelectors = serializableTypeSelectors.Where(t => string.Equals(t.CodecName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_copyableTypeSelectors = copyableTypeSelectors.Where(t => string.Equals(t.CopierName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_options = options.Value;
}

/// <inheritdoc/>
void IFieldCodec.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, object value)
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

// The schema type when serializing the field is the type of the codec.
writer.WriteFieldHeader(fieldIdDelta, expectedType, SelfType, WireType.TagDelimited);

// Write the type name
ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(0, WireType.LengthPrefixed);
writer.Session.TypeCodec.WriteLengthPrefixed(ref writer, value.GetType());

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{

var msgPackWriter = new MessagePackWriter(bufferWriter);
MessagePackSerializer.Serialize(expectedType ?? value.GetType(), ref msgPackWriter, value, _options.SerializerOptions);
msgPackWriter.Flush();

ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(1, WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)bufferWriter.Value.Length);
bufferWriter.Value.CopyTo(ref writer);
}
finally
{
bufferWriter.Value.Dispose();
}

writer.WriteEndObject();
}

/// <inheritdoc/>
object IFieldCodec.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.IsReference)
{
return ReferenceCodec.ReadReference(ref reader, field.FieldType);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
object result = null;
Type type = null;
uint fieldId = 0;
while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
ReferenceCodec.MarkValueField(reader.Session);
type = reader.Session.TypeCodec.ReadLengthPrefixed(ref reader);
break;
case 1:
if (type is null)
{
ThrowTypeFieldMissing();
}

ReferenceCodec.MarkValueField(reader.Session);
var length = reader.ReadVarUInt32();

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{
reader.ReadBytes(ref bufferWriter, (int)length);
result = MessagePackSerializer.Deserialize(type, bufferWriter.Value.AsReadOnlySequence(), _options.SerializerOptions);
}
finally
{
bufferWriter.Value.Dispose();
}

break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
return result;
}

/// <inheritdoc/>
bool IGeneralizedCodec.IsSupportedType(Type type)
{
if (type == SelfType)
{
return true;
}

if (CommonCodecTypeFilter.IsAbstractOrFrameworkType(type))
{
return false;
}

foreach (var selector in _serializableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return true;
}
}

if (_options.IsSerializableType?.Invoke(type) is bool value)
{
return value;
}

return IsMessagePackContract(type, _options.AllowDataContractAttributes);
}

/// <inheritdoc/>
object IDeepCopier.DeepCopy(object input, CopyContext context)
{
if (context.TryGetCopy(input, out object result))
{
return result;
}

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{
var msgPackWriter = new MessagePackWriter(bufferWriter);
MessagePackSerializer.Serialize(input.GetType(), ref msgPackWriter, input, _options.SerializerOptions);
msgPackWriter.Flush();

var sequence = bufferWriter.Value.AsReadOnlySequence();
result = MessagePackSerializer.Deserialize(input.GetType(), sequence, _options.SerializerOptions);
}
catch
{
bufferWriter.Value.Dispose();
}

context.RecordCopy(input, result);
return result;
}

/// <inheritdoc/>
bool IGeneralizedCopier.IsSupportedType(Type type)
{
if (CommonCodecTypeFilter.IsAbstractOrFrameworkType(type))
{
return false;
}

foreach (var selector in _copyableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return true;
}
}

if (_options.IsCopyableType?.Invoke(type) is bool value)
{
return value;
}

return IsMessagePackContract(type, _options.AllowDataContractAttributes);
}

/// <inheritdoc/>
bool? ITypeFilter.IsTypeAllowed(Type type) => (((IGeneralizedCopier)this).IsSupportedType(type) || ((IGeneralizedCodec)this).IsSupportedType(type)) ? true : null;

private static bool IsMessagePackContract(Type type, bool allowDataContractAttribute)
{
if (SupportedTypes.TryGetValue(type, out bool isMsgPackContract))
{
return isMsgPackContract;
}

isMsgPackContract = type.GetCustomAttribute<MessagePackObjectAttribute>() is not null;

if (!isMsgPackContract && allowDataContractAttribute)
{
isMsgPackContract = type.GetCustomAttribute<DataContractAttribute>() is DataContractAttribute;
}

SupportedTypes.TryAdd(type, isMsgPackContract);
return isMsgPackContract;
}

private static void ThrowTypeFieldMissing() => throw new RequiredFieldMissingException("Serialized value is missing its type field.");
}
31 changes: 31 additions & 0 deletions src/Orleans.Serialization.MessagePack/MessagePackCodecOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Runtime.Serialization;
using MessagePack;

namespace Orleans.Serialization;

/// <summary>
/// Options for <see cref="MessagePackCodec"/>.
/// </summary>
public class MessagePackCodecOptions
{
/// <summary>
/// Gets or sets the <see cref="MessagePackSerializerOptions"/>.
/// </summary>
public MessagePackSerializerOptions SerializerOptions { get; set; } = MessagePackSerializerOptions.Standard;

/// <summary>
/// Get or sets flag that allows the use of <see cref="DataContractAttribute"/> marked contracts for MessagePackSerializer.
/// </summary>
public bool AllowDataContractAttributes { get; set; }

/// <summary>
/// Gets or sets a delegate used to determine if a type is supported by the MessagePack serializer for serialization and deserialization.
/// </summary>
public Func<Type, bool?> IsSerializableType { get; set; }

/// <summary>
/// Gets or sets a delegate used to determine if a type is supported by the MessagePack serializer for copying.
/// </summary>
public Func<Type, bool?> IsCopyableType { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Microsoft.Orleans.Serialization.MessagePack</PackageId>
<TargetFrameworks>$(DefaultTargetFrameworks);netstandard2.1</TargetFrameworks>
<PackageDescription>MessagePack integration for Orleans.Serialization</PackageDescription>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MessagePack" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Orleans.Serialization\Orleans.Serialization.csproj" />
</ItemGroup>

</Project>
Loading
Loading