diff --git a/src/KurrentDB.Client/Core/protos/shared.proto b/src/KurrentDB.Client/Core/protos/shared.proto
index 24780afc3..1bf55ef26 100644
--- a/src/KurrentDB.Client/Core/protos/shared.proto
+++ b/src/KurrentDB.Client/Core/protos/shared.proto
@@ -56,6 +56,12 @@ message MaximumAppendSizeExceeded {
uint32 maxAppendSize = 1;
}
+message MaximumAppendEventSizeExceeded {
+ string eventId = 1;
+ uint32 proposedEventSize = 2;
+ uint32 maxAppendEventSize = 3;
+}
+
message BadRequest {
string message = 1;
}
diff --git a/src/KurrentDB.Client/Core/protos/streams.proto b/src/KurrentDB.Client/Core/protos/streams.proto
index 0eb05295c..4ff613a73 100644
--- a/src/KurrentDB.Client/Core/protos/streams.proto
+++ b/src/KurrentDB.Client/Core/protos/streams.proto
@@ -103,9 +103,37 @@ message ReadResp {
FellBehind fell_behind = 9;
}
- message CaughtUp {}
+ // The $all or stream subscription has caught up and become live.
+ message CaughtUp {
+ // Current time in the server when the subscription caught up
+ google.protobuf.Timestamp timestamp = 1;
+
+ // Checkpoint for resuming a stream subscription.
+ // For stream subscriptions it is populated unless the stream is empty.
+ // For $all subscriptions it is not populated.
+ optional int64 stream_revision = 2;
+
+ // Checkpoint for resuming a $all subscription.
+ // For stream subscriptions it is not populated.
+ // For $all subscriptions it is populated unless the database is empty.
+ optional Position position = 3;
+ }
+
+ // The $all or stream subscription has fallen back into catchup mode and is no longer live.
+ message FellBehind {
+ // Current time in the server when the subscription fell behind
+ google.protobuf.Timestamp timestamp = 1;
+
+ // Checkpoint for resuming a stream subscription.
+ // For stream subscriptions it is populated unless the stream is empty.
+ // For $all subscriptions it is not populated.
+ optional int64 stream_revision = 2;
- message FellBehind {}
+ // Checkpoint for resuming a $all subscription.
+ // For stream subscriptions it is not populated.
+ // For $all subscriptions it is populated unless the database is empty.
+ optional Position position = 3;
+ }
message ReadEvent {
RecordedEvent event = 1;
@@ -132,7 +160,16 @@ message ReadResp {
message Checkpoint {
uint64 commit_position = 1;
uint64 prepare_position = 2;
+
+ // Current time in the server when the checkpoint was reached
+ google.protobuf.Timestamp timestamp = 3;
}
+
+ message Position {
+ uint64 commit_position = 1;
+ uint64 prepare_position = 2;
+ }
+
message StreamNotFound {
event_store.client.StreamIdentifier stream_identifier = 1;
}
diff --git a/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs b/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs
index 28d72d3cd..e7e307403 100644
--- a/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs
+++ b/src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs
@@ -226,8 +226,18 @@ async Task PumpMessages() {
response.Checkpoint.PreparePosition
)
),
- CaughtUp => StreamMessage.CaughtUp.Instance,
- FellBehind => StreamMessage.FellBehind.Instance,
+ CaughtUp => response.CaughtUp.Timestamp == null
+ ? StreamMessage.CaughtUp.Empty
+ : new StreamMessage.CaughtUp(
+ response.CaughtUp.Timestamp.ToDateTime(),
+ response.CaughtUp.StreamRevision,
+ new Position(response.CaughtUp.Position.CommitPosition, response.CaughtUp.Position.PreparePosition)),
+ FellBehind => response.FellBehind.Timestamp == null
+ ? StreamMessage.FellBehind.Empty
+ : new StreamMessage.FellBehind(
+ response.FellBehind.Timestamp.ToDateTime(),
+ response.FellBehind.StreamRevision,
+ new Position(response.FellBehind.Position.CommitPosition, response.FellBehind.Position.PreparePosition)),
_ => StreamMessage.Unknown.Instance
};
diff --git a/src/KurrentDB.Client/Streams/StreamMessage.cs b/src/KurrentDB.Client/Streams/StreamMessage.cs
index 6c619cd11..37d8ecfb2 100644
--- a/src/KurrentDB.Client/Streams/StreamMessage.cs
+++ b/src/KurrentDB.Client/Streams/StreamMessage.cs
@@ -62,15 +62,15 @@ public record StreamCheckpointReached(StreamPosition StreamPosition) : StreamMes
///
/// A indicating that the subscription is live.
///
- public record CaughtUp : StreamMessage {
- internal static readonly CaughtUp Instance = new();
+ public record CaughtUp(DateTime? Date, long? StreamRevision, Position? Position) : StreamMessage {
+ internal static readonly CaughtUp Empty = new(null, null, null);
}
///
/// A indicating that the subscription has switched to catch up mode.
///
- public record FellBehind : StreamMessage {
- internal static readonly FellBehind Instance = new();
+ public record FellBehind(DateTime? Date, long? StreamRevision, Position? Position): StreamMessage {
+ internal static readonly FellBehind Empty = new(null, null, null);
}
///