Skip to content

Commit

Permalink
Reproduction for events being applied out of sequence when rebuilding…
Browse files Browse the repository at this point in the history
… a MultiStreamAggregation (#2257)

* Add failing test for applying events out of sequence when rebuilding a MultiStreamAggregation
* Fix events out of sequence with async multi-stream aggregation
* Few tweaks to unit test

Co-authored-by: mysticmind <[email protected]>
  • Loading branch information
T0shik and mysticmind authored May 31, 2022
1 parent b942cb8 commit 0076794
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
67 changes: 67 additions & 0 deletions src/Marten.AsyncDaemon.Testing/MultiStreamAggregationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Xunit;

namespace Marten.AsyncDaemon.Testing;

public class MultiStreamAggregationTests: OneOffConfigurationsContext
{
[Fact]
public async Task events_applied_in_sequence_across_streams()
{
StoreOptions(opts => opts.Projections.Add<Projector>(ProjectionLifecycle.Inline));

var commonId = Guid.NewGuid();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await using var session = theStore.LightweightSession();

session.Events.StartStream(commonId, new Happened() { Id = commonId }, new Happened() { Id = commonId });
await session.SaveChangesAsync();

session.Events.StartStream(new Happened() { Id = commonId }, new Happened() { Id = commonId });
await session.SaveChangesAsync();

var projection = await session.LoadAsync<Projection>(commonId);
var eventSequenceList = new List<long> { 1, 2, 3, 4 };
projection.ShouldNotBeNull();
projection.EventSequenceList.ShouldHaveTheSameElementsAs(eventSequenceList);

await daemon.RebuildProjection<Projector>(CancellationToken.None);
projection = await session.LoadAsync<Projection>(commonId);
projection.ShouldNotBeNull();
projection.EventSequenceList.ShouldHaveTheSameElementsAs(eventSequenceList);
}

public interface ICommonId
{
public Guid Id { get; set; }
}

public class Happened: ICommonId
{
public Guid Id { get; set; }
}

public class Projection
{
public Guid Id { get; set; }
public IList<long> EventSequenceList { get; set; } = new List<long>();
}

public class Projector: MultiStreamAggregation<Projection, Guid>
{
public Projector()
{
Identity<ICommonId>(x => x.Id);
}

public void Apply(Projection p, IEvent<Happened> e) => p.EventSequenceList.Add(e.Sequence);
}
}
2 changes: 1 addition & 1 deletion src/Marten/Events/Aggregation/EventSlice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ internal void ApplyFanOutRules(IEnumerable<IFanOutRule> rules)

private void reorderEvents()
{
var events = _events.Distinct().OrderBy(x => x.Version).ToArray();
var events = _events.Distinct().OrderBy(x => x.Sequence).ToArray();
_events.Clear();
_events.AddRange(events);
}
Expand Down

0 comments on commit 0076794

Please sign in to comment.