diff --git a/src/KurrentDB.Client/Core/protos/dynamic-value.proto b/src/KurrentDB.Client/Core/protos/dynamic-value.proto new file mode 100644 index 000000000..254231803 --- /dev/null +++ b/src/KurrentDB.Client/Core/protos/dynamic-value.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/struct.proto"; + +package kurrentdb.protobuf; +option csharp_namespace = "KurrentDB.Protobuf"; + +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + + // Represents a byte array value. + bytes bytes_value = 4; + + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + + // Represents a string value. + string string_value = 7; + + // Represents a boolean value. + bool boolean_value = 8; + + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + + // Represents a duration value. + google.protobuf.Duration duration_value = 10; + } +} diff --git a/src/KurrentDB.Client/Core/protos/streams.v2.proto b/src/KurrentDB.Client/Core/protos/streams.v2.proto new file mode 100644 index 000000000..5944064ff --- /dev/null +++ b/src/KurrentDB.Client/Core/protos/streams.v2.proto @@ -0,0 +1,170 @@ +syntax = "proto3"; + +// +// This protocol is UNSTABLE in the sense of being subject to change. +// + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.V2"; +option java_package = "io.kurrentdb.v2"; +option java_multiple_files = true; + +import "dynamic-value.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); +} + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. Must be a guid. + // If not provided, the server will generate a new one. + optional string record_id = 1; + + // A collection of properties providing additional system information about the + // record. + map properties = 2; + + // The actual data payload of the record, stored as bytes. + bytes data = 3; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and have a single event. + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + + // The records to append to the stream. + repeated AppendRecord records = 2; + + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + + // The position of the last appended record in the transaction. + int64 position = 2; + + // The revision of the stream after the append operation. + int64 stream_revision = 3; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records failed to append. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; + + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + + ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5; + } +} + +// Represents the output of appending records to a specific stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the operation. + message AccessDenied { + // The reason for access denial. + string reason = 1; + } + + // When the stream has been deleted. + message StreamDeleted { + } + + // When the expected revision of the stream does not match the actual revision. + message WrongExpectedRevision { + // The actual revision of the stream. + int64 stream_revision = 1; + } + + // When the transaction exceeds the maximum size allowed + // (it's bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + // The maximum allowed size of the transaction. + int32 max_size = 1; + } +} diff --git a/src/KurrentDB.Client/KurrentDB.Client.csproj b/src/KurrentDB.Client/KurrentDB.Client.csproj index 581aaaa56..af5ffe1a3 100644 --- a/src/KurrentDB.Client/KurrentDB.Client.csproj +++ b/src/KurrentDB.Client/KurrentDB.Client.csproj @@ -83,6 +83,12 @@ MSBuild:Compile + + MSBuild:Compile + + + MSBuild:Compile + diff --git a/src/KurrentDB.Client/Streams/AppendRequest.cs b/src/KurrentDB.Client/Streams/AppendRequest.cs new file mode 100644 index 000000000..8d2a5b97b --- /dev/null +++ b/src/KurrentDB.Client/Streams/AppendRequest.cs @@ -0,0 +1,3 @@ +namespace KurrentDB.Client; + +public record AppendRequest(string StreamName, StreamState ExpectedState, IEnumerable Messages); diff --git a/src/KurrentDB.Client/Streams/AppendStreamFailure.cs b/src/KurrentDB.Client/Streams/AppendStreamFailure.cs new file mode 100644 index 000000000..fb297ed56 --- /dev/null +++ b/src/KurrentDB.Client/Streams/AppendStreamFailure.cs @@ -0,0 +1,35 @@ +using KurrentDB.Protocol.V2; + +namespace KurrentDB.Client; + +public struct AppendStreamFailure { + public string Stream { get; } + public string? AccessDenied { get; } + public long? WrongExpectedStreamRevision { get; } + public bool IsStreamDeleted { get; } = false; + public int? TransactionMaxSizeExceeded { get; } + + internal AppendStreamFailure(KurrentDB.Protocol.V2.AppendStreamFailure grpcFailure) { + Stream = grpcFailure.Stream; + + switch (grpcFailure.ErrorCase) { + case Protocol.V2.AppendStreamFailure.ErrorOneofCase.AccessDenied: + AccessDenied = grpcFailure.AccessDenied.Reason; + break; + + case Protocol.V2.AppendStreamFailure.ErrorOneofCase.StreamDeleted: IsStreamDeleted = true; + break; + + case Protocol.V2.AppendStreamFailure.ErrorOneofCase.WrongExpectedRevision: + WrongExpectedStreamRevision = grpcFailure.WrongExpectedRevision.StreamRevision; + break; + + case Protocol.V2.AppendStreamFailure.ErrorOneofCase.TransactionMaxSizeExceeded: + TransactionMaxSizeExceeded = grpcFailure.TransactionMaxSizeExceeded.MaxSize; + break; + + default: + throw new ArgumentOutOfRangeException(nameof(grpcFailure)); + } + } +} diff --git a/src/KurrentDB.Client/Streams/AppendStreamSuccess.cs b/src/KurrentDB.Client/Streams/AppendStreamSuccess.cs new file mode 100644 index 000000000..70124f21e --- /dev/null +++ b/src/KurrentDB.Client/Streams/AppendStreamSuccess.cs @@ -0,0 +1,7 @@ +namespace KurrentDB.Client; + +public readonly struct AppendStreamSuccess(string stream, long position, StreamState nextStreamState) { + public string Stream { get; } = stream; + public long Position { get; } = position; + public StreamState NextStreamState { get; } = nextStreamState; +} diff --git a/src/KurrentDB.Client/Streams/KurrentDBClient.MultiAppend.cs b/src/KurrentDB.Client/Streams/KurrentDBClient.MultiAppend.cs new file mode 100644 index 000000000..acb9c2cc7 --- /dev/null +++ b/src/KurrentDB.Client/Streams/KurrentDBClient.MultiAppend.cs @@ -0,0 +1,70 @@ +using Google.Protobuf; +using KurrentDB.Client.Core.Serialization; +using KurrentDB.Protocol.V2; + +namespace KurrentDB.Client; + +public partial class KurrentDBClient { + /// + /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through + /// + /// Messages to append to the stream. + /// Optional settings for the append operation, e.g. deadline, user credentials etc. + /// The optional . + /// + public Task MultiAppend( + IEnumerable requests, + AppendToStreamOptions? options = null, + CancellationToken token = default + ) { + return MultiAppendInternal(options, requests, token).AsTask(); + } + + async ValueTask MultiAppendInternal( + AppendToStreamOptions? options, IEnumerable requests, CancellationToken token + ) { + var channelInfo = await GetChannelInfo(token).ConfigureAwait(false); + var client = new StreamsService.StreamsServiceClient(channelInfo.CallInvoker).MultiStreamAppendSession( + KurrentDBCallOptions.CreateStreaming( + Settings, + userCredentials: Settings.DefaultCredentials, + cancellationToken: token + ) + ); + + foreach (var request in GetRequests(options, requests, token)) + await client.RequestStream.WriteAsync(request).ConfigureAwait(false); + + await client.RequestStream.CompleteAsync().ConfigureAwait(false); + return new MultiStreamWriteResult(await client.ResponseAsync); + } + + IEnumerable GetRequests( + AppendToStreamOptions? options, + IEnumerable requests, + CancellationToken token + ) { + foreach (var request in requests) { + token.ThrowIfCancellationRequested(); + + var serializationContext = + new MessageSerializationContext(MessageTypeNamingResolutionContext.FromStreamName(request.StreamName)); + + var messages = _messageSerializer.With(options?.SerializationSettings) + .Serialize(request.Messages, serializationContext); + + + yield return new AppendStreamRequest { + Stream = request.StreamName, + ExpectedRevision = request.ExpectedState.ToInt64(), + Records = { + messages.Select(x => new AppendRecord { + RecordId = x.MessageId.ToString(), + Data = ByteString.CopyFrom(x.Data.Span), + } + ) + } + }; + } + } +} diff --git a/src/KurrentDB.Client/Streams/MultiStreamWriteResult.cs b/src/KurrentDB.Client/Streams/MultiStreamWriteResult.cs new file mode 100644 index 000000000..951464a6a --- /dev/null +++ b/src/KurrentDB.Client/Streams/MultiStreamWriteResult.cs @@ -0,0 +1,38 @@ +using JetBrains.Annotations; +using KurrentDB.Protocol.V2; + +namespace KurrentDB.Client; + +[PublicAPI] +public class MultiStreamWriteResult { + readonly List? _successes; + public List Failures { get; } = []; + + internal MultiStreamWriteResult(MultiStreamAppendResponse response) { + if (response.ResultCase == MultiStreamAppendResponse.ResultOneofCase.Success) { + _successes = new List(response.Success.Output.Count); + + foreach (var success in response.Success.Output) + _successes.Add( + new AppendStreamSuccess(success.Stream, success.Position, new StreamState(success.StreamRevision)) + ); + } else { + _successes = null; + Failures = new List(response.Failure.Output.Count); + + foreach (var failure in response.Failure.Output) + Failures.Add(new AppendStreamFailure(failure)); + } + } + + public bool TryGetSuccesses(out List successes) { + successes = []; + + if (_successes == null) + return false; + + successes = _successes; + return true; + + } +}