Skip to content

feat: implement multi stream append #350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/KurrentDB.Client/Core/protos/dynamic-value.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
syntax = "proto3";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

package kurrentdb.protobuf;
option csharp_namespace = "KurrentDB.Protobuf";

message DynamicValue {
oneof kind {
// Represents a null value.
google.protobuf.NullValue null_value = 1;

// Represents a 32-bit signed integer value.
sint32 int32_value = 2;

// Represents a 64-bit signed integer value.
sint64 int64_value = 3;

// Represents a byte array value.
bytes bytes_value = 4;

// Represents a 64-bit double-precision floating-point value.
double double_value = 5;

// Represents a 32-bit single-precision floating-point value
float float_value = 6;

// Represents a string value.
string string_value = 7;

// Represents a boolean value.
bool boolean_value = 8;

// Represents a timestamp value.
google.protobuf.Timestamp timestamp_value = 9;

// Represents a duration value.
google.protobuf.Duration duration_value = 10;
}
}
170 changes: 170 additions & 0 deletions src/KurrentDB.Client/Core/protos/streams.v2.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
syntax = "proto3";

//
// This protocol is UNSTABLE in the sense of being subject to change.
//

package kurrentdb.protocol.v2;

option csharp_namespace = "KurrentDB.Protocol.V2";
option java_package = "io.kurrentdb.v2";
option java_multiple_files = true;

import "dynamic-value.proto";

service StreamsService {
// Executes an atomic operation to append records to multiple streams.
// This transactional method ensures that all appends either succeed
// completely, or are entirely rolled back, thereby maintaining strict data
// consistency across all involved streams.
rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse);

// Streaming version of MultiStreamAppend that allows clients to send multiple
// append requests over a single connection. When the stream completes, all
// records are appended transactionally (all succeed or fail together).
// Provides improved efficiency for high-throughput scenarios while
// maintaining the same transactional guarantees.
rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse);
}

// Record to be appended to a stream.
message AppendRecord {
// Universally Unique identifier for the record. Must be a guid.
// If not provided, the server will generate a new one.
optional string record_id = 1;

// A collection of properties providing additional system information about the
// record.
map<string, kurrentdb.protobuf.DynamicValue> properties = 2;

// The actual data payload of the record, stored as bytes.
bytes data = 3;
}

// Constants that match the expected state of a stream during an
// append operation. It can be used to specify whether the stream should exist,
// not exist, or can be in any state.
enum ExpectedRevisionConstants {
// The stream should exist and have a single event.
EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0;

// It is not important whether the stream exists or not.
EXPECTED_REVISION_CONSTANTS_ANY = -2;

// The stream should not exist. If it does, the append will fail.
EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1;

// The stream should exist
EXPECTED_REVISION_CONSTANTS_EXISTS = -4;
}

// Represents the input for appending records to a specific stream.
message AppendStreamRequest {
// The name of the stream to append records to.
string stream = 1;

// The records to append to the stream.
repeated AppendRecord records = 2;

// The expected revision of the stream. If the stream's current revision does
// not match, the append will fail.
// The expected revision can also be one of the special values
// from ExpectedRevisionConstants.
// missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY
optional sint64 expected_revision = 3;
}

// Success represents the successful outcome of an append operation.
message AppendStreamSuccess {
// The name of the stream to which records were appended.
string stream = 1;

// The position of the last appended record in the transaction.
int64 position = 2;

// The revision of the stream after the append operation.
int64 stream_revision = 3;
}

// Failure represents the detailed error information when an append operation fails.
message AppendStreamFailure {
// The name of the stream to which records failed to append.
string stream = 1;

// The error details
oneof error {
// Failed because the actual stream revision didn't match the expected revision.
ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2;

// Failed because the client lacks sufficient permissions.
ErrorDetails.AccessDenied access_denied = 3;

// Failed because the target stream has been deleted.
ErrorDetails.StreamDeleted stream_deleted = 4;

ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5;
}
}

// Represents the output of appending records to a specific stream.
message AppendStreamResponse {
// The result of the append operation.
oneof result {
// Success represents the successful outcome of an append operation.
AppendStreamSuccess success = 1;

// Failure represents the details of a failed append operation.
AppendStreamFailure failure = 2;
}
}

// MultiStreamAppendRequest represents a request to append records to multiple streams.
message MultiStreamAppendRequest {
// A list of AppendStreamInput messages, each representing a stream to which records should be appended.
repeated AppendStreamRequest input = 1;
}

// Response from the MultiStreamAppend operation.
message MultiStreamAppendResponse {
oneof result {
// Success represents the successful outcome of a multi-stream append operation.
Success success = 1;

// Failure represents the details of a failed multi-stream append operation.
Failure failure = 2;
}

message Success {
repeated AppendStreamSuccess output = 1;
}

message Failure {
repeated AppendStreamFailure output = 1;
}
}

// ErrorDetails provides detailed information about specific error conditions.
message ErrorDetails {
// When the user does not have sufficient permissions to perform the operation.
message AccessDenied {
// The reason for access denial.
string reason = 1;
}

// When the stream has been deleted.
message StreamDeleted {
}

// When the expected revision of the stream does not match the actual revision.
message WrongExpectedRevision {
// The actual revision of the stream.
int64 stream_revision = 1;
}

// When the transaction exceeds the maximum size allowed
// (it's bigger than the configured chunk size).
message TransactionMaxSizeExceeded {
// The maximum allowed size of the transaction.
int32 max_size = 1;
}
}
6 changes: 6 additions & 0 deletions src/KurrentDB.Client/KurrentDB.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<Protobuf Update="Core\protos\usermanagement.proto">
<Generator>MSBuild:Compile</Generator>
</Protobuf>
<Protobuf Update="Core\protos\stream.v2.proto">
<Generator>MSBuild:Compile</Generator>
</Protobuf>
<Protobuf Update="Core\protos\dynamic-value.proto">
<Generator>MSBuild:Compile</Generator>
</Protobuf>

<!-- -->
<!-- <Protobuf-->
Expand Down
3 changes: 3 additions & 0 deletions src/KurrentDB.Client/Streams/AppendRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace KurrentDB.Client;

public record AppendRequest(string StreamName, StreamState ExpectedState, IEnumerable<Message> Messages);
35 changes: 35 additions & 0 deletions src/KurrentDB.Client/Streams/AppendStreamFailure.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using KurrentDB.Protocol.V2;

namespace KurrentDB.Client;

public struct AppendStreamFailure {
public string Stream { get; }
public string? AccessDenied { get; }
public long? WrongExpectedStreamRevision { get; }
public bool IsStreamDeleted { get; } = false;
public int? TransactionMaxSizeExceeded { get; }

internal AppendStreamFailure(KurrentDB.Protocol.V2.AppendStreamFailure grpcFailure) {
Stream = grpcFailure.Stream;

switch (grpcFailure.ErrorCase) {
case Protocol.V2.AppendStreamFailure.ErrorOneofCase.AccessDenied:
AccessDenied = grpcFailure.AccessDenied.Reason;
break;

case Protocol.V2.AppendStreamFailure.ErrorOneofCase.StreamDeleted: IsStreamDeleted = true;
break;

case Protocol.V2.AppendStreamFailure.ErrorOneofCase.WrongExpectedRevision:
WrongExpectedStreamRevision = grpcFailure.WrongExpectedRevision.StreamRevision;
break;

case Protocol.V2.AppendStreamFailure.ErrorOneofCase.TransactionMaxSizeExceeded:
TransactionMaxSizeExceeded = grpcFailure.TransactionMaxSizeExceeded.MaxSize;
break;

default:
throw new ArgumentOutOfRangeException(nameof(grpcFailure));
}
}
}
7 changes: 7 additions & 0 deletions src/KurrentDB.Client/Streams/AppendStreamSuccess.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace KurrentDB.Client;

public readonly struct AppendStreamSuccess(string stream, long position, StreamState nextStreamState) {
public string Stream { get; } = stream;
public long Position { get; } = position;
public StreamState NextStreamState { get; } = nextStreamState;
}
70 changes: 70 additions & 0 deletions src/KurrentDB.Client/Streams/KurrentDBClient.MultiAppend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Google.Protobuf;
using KurrentDB.Client.Core.Serialization;
using KurrentDB.Protocol.V2;

namespace KurrentDB.Client;

public partial class KurrentDBClient {
/// <summary>
/// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through <see cref="KurrentDBClientSettings"/>
/// </summary>
/// <param name="requests">Messages to append to the stream.</param>
/// <param name="options">Optional settings for the append operation, e.g. deadline, user credentials etc.</param>
/// <param name="token">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<MultiStreamWriteResult> MultiAppend(
IEnumerable<AppendRequest> requests,
AppendToStreamOptions? options = null,
CancellationToken token = default
) {
return MultiAppendInternal(options, requests, token).AsTask();
}

async ValueTask<MultiStreamWriteResult> MultiAppendInternal(
AppendToStreamOptions? options, IEnumerable<AppendRequest> requests, CancellationToken token
) {
var channelInfo = await GetChannelInfo(token).ConfigureAwait(false);
var client = new StreamsService.StreamsServiceClient(channelInfo.CallInvoker).MultiStreamAppendSession(
KurrentDBCallOptions.CreateStreaming(
Settings,
userCredentials: Settings.DefaultCredentials,
cancellationToken: token
)
);

foreach (var request in GetRequests(options, requests, token))
await client.RequestStream.WriteAsync(request).ConfigureAwait(false);

await client.RequestStream.CompleteAsync().ConfigureAwait(false);
return new MultiStreamWriteResult(await client.ResponseAsync);
}

IEnumerable<AppendStreamRequest> GetRequests(
AppendToStreamOptions? options,
IEnumerable<AppendRequest> requests,
CancellationToken token
) {
foreach (var request in requests) {
token.ThrowIfCancellationRequested();

var serializationContext =
new MessageSerializationContext(MessageTypeNamingResolutionContext.FromStreamName(request.StreamName));

var messages = _messageSerializer.With(options?.SerializationSettings)
.Serialize(request.Messages, serializationContext);


yield return new AppendStreamRequest {
Stream = request.StreamName,
ExpectedRevision = request.ExpectedState.ToInt64(),
Records = {
messages.Select(x => new AppendRecord {
RecordId = x.MessageId.ToString(),
Data = ByteString.CopyFrom(x.Data.Span),
}
)
}
};
}
}
}
38 changes: 38 additions & 0 deletions src/KurrentDB.Client/Streams/MultiStreamWriteResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using JetBrains.Annotations;
using KurrentDB.Protocol.V2;

namespace KurrentDB.Client;

[PublicAPI]
public class MultiStreamWriteResult {
readonly List<AppendStreamSuccess>? _successes;
public List<AppendStreamFailure> Failures { get; } = [];

internal MultiStreamWriteResult(MultiStreamAppendResponse response) {
if (response.ResultCase == MultiStreamAppendResponse.ResultOneofCase.Success) {
_successes = new List<AppendStreamSuccess>(response.Success.Output.Count);

foreach (var success in response.Success.Output)
_successes.Add(
new AppendStreamSuccess(success.Stream, success.Position, new StreamState(success.StreamRevision))
);
} else {
_successes = null;
Failures = new List<AppendStreamFailure>(response.Failure.Output.Count);

foreach (var failure in response.Failure.Output)
Failures.Add(new AppendStreamFailure(failure));
}
}

public bool TryGetSuccesses(out List<AppendStreamSuccess> successes) {
successes = [];

if (_successes == null)
return false;

successes = _successes;
return true;

}
}
Loading