Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid wasted db round trips by not loading single stream projections for new streams #3311

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Marten/Events/Aggregation/AggregationRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
}

var aggregate = slice.Aggregate;
if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline)
// do not load if sliced by stream and the stream does not yet exist
if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline && (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start))
{
aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Aggregation/ByStreamId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ValueTask<IReadOnlyList<EventSlice<TDoc, Guid>>> SliceInlineActions(IQuer
return new ValueTask<IReadOnlyList<EventSlice<TDoc, Guid>>>(streams.Select(s =>
{
var tenant = new Tenant(s.TenantId, querySession.Database);
return new EventSlice<TDoc, Guid>(s.Id, tenant, s.Events);
return new EventSlice<TDoc, Guid>(s.Id, tenant, s.Events){ActionType = s.ActionType};
}).ToList());
}

Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Aggregation/ByStreamKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public ValueTask<IReadOnlyList<EventSlice<TDoc, string>>> SliceInlineActions(IQu
return new ValueTask<IReadOnlyList<EventSlice<TDoc, string>>>(streams.Select(s =>
{
var tenant = new Tenant(s.TenantId, querySession.Database);
return new EventSlice<TDoc, string>(s.Key!, tenant, s.Events);
return new EventSlice<TDoc, string>(s.Key!, tenant, s.Events){ActionType = s.ActionType};
}).ToList());
}

Expand Down
8 changes: 5 additions & 3 deletions src/Marten/Events/Aggregation/CustomProjection.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.CodeGeneration;
using JasperFx.Core.Reflection;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
Expand Down Expand Up @@ -64,7 +62,11 @@ async Task IProjection.ApplyAsync(IDocumentOperations operations, IReadOnlyList<
{
var tenantedSession = martenSession.UseTenancyBasedOnSliceAndStorage(storage, slice);

slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false);
// do not load if sliced by stream and the stream does not yet exist
if (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start)
{
slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false);
}
await ApplyChangesAsync(tenantedSession, slice, cancellation).ConfigureAwait(false);
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/Marten/Events/Aggregation/EventSlice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,22 @@ public EventSlice(TId id, IQuerySession querySession, IEnumerable<IEvent>? event
{
}

private readonly StreamActionType? _actionType;

/// <summary>
/// Is this action the start of a new stream or appending
/// to an existing stream?
/// </summary>
public StreamActionType ActionType => _events[0].Version == 1 ? StreamActionType.Start : StreamActionType.Append;

/// <remarks>
/// Default's to determining from the version of the first event on
/// stream, but can be overridden so that the value works with
/// QuickAppend
/// </remarks>
public StreamActionType ActionType
{
get => _actionType ?? (_events[0].Version == 1 ? StreamActionType.Start : StreamActionType.Append);
init => _actionType = value;
}

/// <summary>
/// The aggregate identity
Expand Down
Loading