From 681a4bdb2847cf34203f4078851fdf854c5a10d6 Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 25 Apr 2025 22:25:44 -0400 Subject: [PATCH 1/2] feat: supports new caught up and fell behind events. --- src/KurrentDB.Client/Core/protos/shared.proto | 8 +++- .../Core/protos/streams.proto | 41 ++++++++++++++++++- .../Streams/KurrentDBClient.Subscriptions.cs | 14 ++++++- src/KurrentDB.Client/Streams/StreamMessage.cs | 8 ++-- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/KurrentDB.Client/Core/protos/shared.proto b/src/KurrentDB.Client/Core/protos/shared.proto index 24780afc3..d5335abbd 100644 --- a/src/KurrentDB.Client/Core/protos/shared.proto +++ b/src/KurrentDB.Client/Core/protos/shared.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package event_store.client; -option java_package = "io.kurrent.dbclient.proto.shared"; +option java_package = "com.eventstore.dbclient.proto.shared"; import "google/protobuf/empty.proto"; message UUID { @@ -56,6 +56,12 @@ message MaximumAppendSizeExceeded { uint32 maxAppendSize = 1; } +message MaximumAppendEventSizeExceeded { + string eventId = 1; + uint32 proposedEventSize = 2; + uint32 maxAppendEventSize = 3; +} + message BadRequest { string message = 1; } diff --git a/src/KurrentDB.Client/Core/protos/streams.proto b/src/KurrentDB.Client/Core/protos/streams.proto index 0eb05295c..4ff613a73 100644 --- a/src/KurrentDB.Client/Core/protos/streams.proto +++ b/src/KurrentDB.Client/Core/protos/streams.proto @@ -103,9 +103,37 @@ message ReadResp { FellBehind fell_behind = 9; } - message CaughtUp {} + // The $all or stream subscription has caught up and become live. + message CaughtUp { + // Current time in the server when the subscription caught up + google.protobuf.Timestamp timestamp = 1; + + // Checkpoint for resuming a stream subscription. + // For stream subscriptions it is populated unless the stream is empty. + // For $all subscriptions it is not populated. + optional int64 stream_revision = 2; + + // Checkpoint for resuming a $all subscription. + // For stream subscriptions it is not populated. + // For $all subscriptions it is populated unless the database is empty. + optional Position position = 3; + } + + // The $all or stream subscription has fallen back into catchup mode and is no longer live. + message FellBehind { + // Current time in the server when the subscription fell behind + google.protobuf.Timestamp timestamp = 1; + + // Checkpoint for resuming a stream subscription. + // For stream subscriptions it is populated unless the stream is empty. + // For $all subscriptions it is not populated. + optional int64 stream_revision = 2; - message FellBehind {} + // Checkpoint for resuming a $all subscription. + // For stream subscriptions it is not populated. + // For $all subscriptions it is populated unless the database is empty. + optional Position position = 3; + } message ReadEvent { RecordedEvent event = 1; @@ -132,7 +160,16 @@ message ReadResp { message Checkpoint { uint64 commit_position = 1; uint64 prepare_position = 2; + + // Current time in the server when the checkpoint was reached + google.protobuf.Timestamp timestamp = 3; } + + message Position { + uint64 commit_position = 1; + uint64 prepare_position = 2; + } + message StreamNotFound { event_store.client.StreamIdentifier stream_identifier = 1; } diff --git a/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs b/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs index 28d72d3cd..e7e307403 100644 --- a/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs +++ b/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs @@ -226,8 +226,18 @@ async Task PumpMessages() { response.Checkpoint.PreparePosition ) ), - CaughtUp => StreamMessage.CaughtUp.Instance, - FellBehind => StreamMessage.FellBehind.Instance, + CaughtUp => response.CaughtUp.Timestamp == null + ? StreamMessage.CaughtUp.Empty + : new StreamMessage.CaughtUp( + response.CaughtUp.Timestamp.ToDateTime(), + response.CaughtUp.StreamRevision, + new Position(response.CaughtUp.Position.CommitPosition, response.CaughtUp.Position.PreparePosition)), + FellBehind => response.FellBehind.Timestamp == null + ? StreamMessage.FellBehind.Empty + : new StreamMessage.FellBehind( + response.FellBehind.Timestamp.ToDateTime(), + response.FellBehind.StreamRevision, + new Position(response.FellBehind.Position.CommitPosition, response.FellBehind.Position.PreparePosition)), _ => StreamMessage.Unknown.Instance }; diff --git a/src/KurrentDB.Client/Streams/StreamMessage.cs b/src/KurrentDB.Client/Streams/StreamMessage.cs index 6c619cd11..37d8ecfb2 100644 --- a/src/KurrentDB.Client/Streams/StreamMessage.cs +++ b/src/KurrentDB.Client/Streams/StreamMessage.cs @@ -62,15 +62,15 @@ public record StreamCheckpointReached(StreamPosition StreamPosition) : StreamMes /// /// A indicating that the subscription is live. /// - public record CaughtUp : StreamMessage { - internal static readonly CaughtUp Instance = new(); + public record CaughtUp(DateTime? Date, long? StreamRevision, Position? Position) : StreamMessage { + internal static readonly CaughtUp Empty = new(null, null, null); } /// /// A indicating that the subscription has switched to catch up mode. /// - public record FellBehind : StreamMessage { - internal static readonly FellBehind Instance = new(); + public record FellBehind(DateTime? Date, long? StreamRevision, Position? Position): StreamMessage { + internal static readonly FellBehind Empty = new(null, null, null); } /// From f4e0c5d292cfaf755582898131d01ca320d2714e Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 25 Apr 2025 22:33:53 -0400 Subject: [PATCH 2/2] fixup --- src/KurrentDB.Client/Core/protos/shared.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KurrentDB.Client/Core/protos/shared.proto b/src/KurrentDB.Client/Core/protos/shared.proto index d5335abbd..1bf55ef26 100644 --- a/src/KurrentDB.Client/Core/protos/shared.proto +++ b/src/KurrentDB.Client/Core/protos/shared.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package event_store.client; -option java_package = "com.eventstore.dbclient.proto.shared"; +option java_package = "io.kurrent.dbclient.proto.shared"; import "google/protobuf/empty.proto"; message UUID {