Skip to content

feat: supports new caught up and fell behind events. #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/KurrentDB.Client/Core/protos/shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ message MaximumAppendSizeExceeded {
uint32 maxAppendSize = 1;
}

message MaximumAppendEventSizeExceeded {
string eventId = 1;
uint32 proposedEventSize = 2;
uint32 maxAppendEventSize = 3;
}

message BadRequest {
string message = 1;
}
41 changes: 39 additions & 2 deletions src/KurrentDB.Client/Core/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,37 @@ message ReadResp {
FellBehind fell_behind = 9;
}

message CaughtUp {}
// The $all or stream subscription has caught up and become live.
message CaughtUp {
// Current time in the server when the subscription caught up
google.protobuf.Timestamp timestamp = 1;

// Checkpoint for resuming a stream subscription.
// For stream subscriptions it is populated unless the stream is empty.
// For $all subscriptions it is not populated.
optional int64 stream_revision = 2;

// Checkpoint for resuming a $all subscription.
// For stream subscriptions it is not populated.
// For $all subscriptions it is populated unless the database is empty.
optional Position position = 3;
}

// The $all or stream subscription has fallen back into catchup mode and is no longer live.
message FellBehind {
// Current time in the server when the subscription fell behind
google.protobuf.Timestamp timestamp = 1;

// Checkpoint for resuming a stream subscription.
// For stream subscriptions it is populated unless the stream is empty.
// For $all subscriptions it is not populated.
optional int64 stream_revision = 2;

message FellBehind {}
// Checkpoint for resuming a $all subscription.
// For stream subscriptions it is not populated.
// For $all subscriptions it is populated unless the database is empty.
optional Position position = 3;
}

message ReadEvent {
RecordedEvent event = 1;
Expand All @@ -132,7 +160,16 @@ message ReadResp {
message Checkpoint {
uint64 commit_position = 1;
uint64 prepare_position = 2;

// Current time in the server when the checkpoint was reached
google.protobuf.Timestamp timestamp = 3;
}

message Position {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}

message StreamNotFound {
event_store.client.StreamIdentifier stream_identifier = 1;
}
Expand Down
14 changes: 12 additions & 2 deletions src/KurrentDB.Client/Streams/KurrentDBClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,18 @@ async Task PumpMessages() {
response.Checkpoint.PreparePosition
)
),
CaughtUp => StreamMessage.CaughtUp.Instance,
FellBehind => StreamMessage.FellBehind.Instance,
CaughtUp => response.CaughtUp.Timestamp == null
? StreamMessage.CaughtUp.Empty
: new StreamMessage.CaughtUp(
response.CaughtUp.Timestamp.ToDateTime(),
response.CaughtUp.StreamRevision,
new Position(response.CaughtUp.Position.CommitPosition, response.CaughtUp.Position.PreparePosition)),
FellBehind => response.FellBehind.Timestamp == null
? StreamMessage.FellBehind.Empty
: new StreamMessage.FellBehind(
response.FellBehind.Timestamp.ToDateTime(),
response.FellBehind.StreamRevision,
new Position(response.FellBehind.Position.CommitPosition, response.FellBehind.Position.PreparePosition)),
_ => StreamMessage.Unknown.Instance
};

Expand Down
8 changes: 4 additions & 4 deletions src/KurrentDB.Client/Streams/StreamMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public record StreamCheckpointReached(StreamPosition StreamPosition) : StreamMes
/// <summary>
/// A <see cref="KurrentDB.Client.StreamMessage"/> indicating that the subscription is live.
/// </summary>
public record CaughtUp : StreamMessage {
internal static readonly CaughtUp Instance = new();
public record CaughtUp(DateTime? Date, long? StreamRevision, Position? Position) : StreamMessage {
internal static readonly CaughtUp Empty = new(null, null, null);
}

/// <summary>
/// A <see cref="KurrentDB.Client.StreamMessage"/> indicating that the subscription has switched to catch up mode.
/// </summary>
public record FellBehind : StreamMessage {
internal static readonly FellBehind Instance = new();
public record FellBehind(DateTime? Date, long? StreamRevision, Position? Position): StreamMessage {
internal static readonly FellBehind Empty = new(null, null, null);
}

/// <summary>
Expand Down
Loading