Skip to content

Commit

Permalink
AggregateTo sets the aggregate identity. Closes GH-2891
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Sep 15, 2024
1 parent f057944 commit b3de518
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
31 changes: 30 additions & 1 deletion src/EventSourcingTests/aggregateto_linq_operator_tests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using EventSourcingTests.Projections;
using Marten.Events;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests;
Expand Down Expand Up @@ -81,6 +83,33 @@ public async Task can_aggregate_with_initial_state_asynchronously()
questParty.Members.ShouldHaveTheSameElementsAs("Lan", "Rand", "Matrim", "Perrin", "Elayne", "Elmindreda");
}

[Fact]
public async Task gets_the_id_set()
{
var initialParty = new QuestParty { Members = new List<string> { "Lan" } };
var id = theSession.Events.StartStream<Quest>(_joined1, _departed1).Id;
theSession.Events.StartStream<Quest>(_joined2, _departed2);
await theSession.SaveChangesAsync();

var questParty = await theSession.Events.QueryAllRawEvents().Where(x => x.StreamId == id).AggregateToAsync(initialParty);
questParty.Id.ShouldBe(id);
}

[Fact]
public async Task gets_the_key_set()
{
UseStreamIdentity(StreamIdentity.AsString);

var key = Guid.NewGuid().ToString();

theSession.Events.StartStream<Quest>(key, _joined1, _departed1);
theSession.Events.StartStream<Quest>(Guid.NewGuid().ToString(),_joined2, _departed2);
await theSession.SaveChangesAsync();

var questParty = await theSession.Events.QueryAllRawEvents().Where(x => x.StreamKey == key).AggregateToAsync<QuestPartyWithStringIdentifier>(null);
questParty.Id.ShouldBe(key);
}

public aggregateTo_linq_operator_tests(DefaultStoreFixture fixture): base(fixture)
{
theStore.Advanced.Clean.DeleteAllEventData();
Expand Down
19 changes: 19 additions & 0 deletions src/Marten/Events/AggregateToExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
using Marten.Internal.Sessions;
using Marten.Linq;

namespace Marten.Events;
Expand All @@ -28,9 +31,23 @@ public static T AggregateTo<T>(this IMartenQueryable<IEvent> queryable, T state

var aggregate = aggregator.Build(queryable.ToList(), session, state);

setIdentity(session, aggregate, events);

return aggregate;
}

private static void setIdentity<T>(QuerySession session, T aggregate, IEnumerable<IEvent> events) where T : class
{
if (session.Options.Events.StreamIdentity == StreamIdentity.AsGuid)
{
session.StorageFor<T, Guid>().SetIdentity(aggregate, events.Last().StreamId);
}
else
{
session.StorageFor<T, string>().SetIdentity(aggregate, events.Last().StreamKey);
}
}

/// <summary>
/// Aggregate the events in this query to the type T
/// </summary>
Expand All @@ -53,6 +70,8 @@ public static async Task<T> AggregateToAsync<T>(this IMartenQueryable<IEvent> qu

var aggregate = await aggregator.BuildAsync(events, session, state, token).ConfigureAwait(false);

setIdentity(session, aggregate, events);

return aggregate;
}

Expand Down

0 comments on commit b3de518

Please sign in to comment.