From ee1010a2780aa4d3145490d305c75b5c0596faa4 Mon Sep 17 00:00:00 2001 From: Joe Nathan Abellard Date: Fri, 27 May 2022 09:57:09 -0400 Subject: [PATCH] Prevent append operation on archived streams (#2244) * Prevent append operation on archived stream * Add integration tests to ensure appending events to an archived stream is not allowed --- docs/diagnostics.md | 2 +- docs/events/archiving.md | 6 +-- src/EventSourcingTests/archiving_events.cs | 40 +++++++++++++++++++ src/Marten/Events/EventGraph.Processing.cs | 9 +++++ .../InvalidStreamOperationException.cs | 13 ++++++ 5 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 src/Marten/Exceptions/InvalidStreamOperationException.cs diff --git a/docs/diagnostics.md b/docs/diagnostics.md index 7a83e67699..76c8d09c6c 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -362,7 +362,7 @@ The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSessi // session to pipe Marten logging to the xUnit.Net output theSession.Logger = new TestOutputMartenLogger(_output); ``` -snippet source | anchor +snippet source | anchor ## Previewing the PostgreSQL Query Plan diff --git a/docs/events/archiving.md b/docs/events/archiving.md index 01b341834a..4baf1c0b44 100644 --- a/docs/events/archiving.md +++ b/docs/events/archiving.md @@ -16,7 +16,7 @@ public async Task SampleArchive(IDocumentSession session, string streamId) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor As in all cases with an `IDocumentSession`, you need to call `SaveChanges()` to commit the @@ -35,7 +35,7 @@ var events = await theSession.Events .Where(x => x.IsArchived) .ToListAsync(); ``` -snippet source | anchor +snippet source | anchor You can also query for all events both archived and not archived with `MaybeArchived()` @@ -47,5 +47,5 @@ like so: var events = await theSession.Events.QueryAllRawEvents() .Where(x => x.MaybeArchived()).ToListAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/src/EventSourcingTests/archiving_events.cs b/src/EventSourcingTests/archiving_events.cs index c4cc499d2b..39c93600ff 100644 --- a/src/EventSourcingTests/archiving_events.cs +++ b/src/EventSourcingTests/archiving_events.cs @@ -5,6 +5,7 @@ using Marten; using Marten.Events; using Marten.Events.Archiving; +using Marten.Exceptions; using Marten.Testing.Harness; using Shouldly; using Weasel.Core; @@ -237,5 +238,44 @@ public async Task query_by_a_specific_event_filters_out_archived_events_by_defau events.All(x => x.Tracker != aEvent2.Tracker).ShouldBeTrue(); } + [Fact] + public async Task prevent_append_operation_for_archived_stream_on_sync_commit() + { + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + var streamId = Guid.NewGuid(); + theSession.Events.Append(streamId, new AEvent()); + await theSession.SaveChangesAsync(); + + theSession.Events.ArchiveStream(streamId); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(streamId, new BEvent()); + var thrownException = Should.Throw( () => + { + theSession.SaveChanges(); + }); + thrownException.Message.ShouldBe($"Attempted to append event to archived stream with Id '{streamId}'."); + } + + [Fact] + public async Task prevent_append_operation_for_archived_stream_on_async_commit() + { + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + var streamId = Guid.NewGuid(); + theSession.Events.Append(streamId, new AEvent()); + await theSession.SaveChangesAsync(); + + theSession.Events.ArchiveStream(streamId); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(streamId, new BEvent()); + var thrownException = Should.Throw( async () => + { + await theSession.SaveChangesAsync(); + }); + thrownException.Message.ShouldBe($"Attempted to append event to archived stream with Id '{streamId}'."); + } } } diff --git a/src/Marten/Events/EventGraph.Processing.cs b/src/Marten/Events/EventGraph.Processing.cs index eed65cd486..89674544e6 100644 --- a/src/Marten/Events/EventGraph.Processing.cs +++ b/src/Marten/Events/EventGraph.Processing.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Marten.Events.Operations; +using Marten.Exceptions; using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Sessions; @@ -52,6 +53,10 @@ internal void ProcessEvents(DocumentSessionBase session) } else { + if (state.IsArchived) + { + throw new InvalidStreamOperationException($"Attempted to append event to archived stream with Id '{state.Id}'."); + } stream.PrepareEvents(state.Version, this, sequences, session); session.QueueOperation(storage.UpdateStreamVersion(stream)); } @@ -108,6 +113,10 @@ internal async Task ProcessEventsAsync(DocumentSessionBase session, Cancellation } else { + if (state.IsArchived) + { + throw new InvalidStreamOperationException($"Attempted to append event to archived stream with Id '{state.Id}'."); + } stream.PrepareEvents(state.Version, this, sequences, session); session.QueueOperation(storage.UpdateStreamVersion(stream)); } diff --git a/src/Marten/Exceptions/InvalidStreamOperationException.cs b/src/Marten/Exceptions/InvalidStreamOperationException.cs new file mode 100644 index 0000000000..219c4870d5 --- /dev/null +++ b/src/Marten/Exceptions/InvalidStreamOperationException.cs @@ -0,0 +1,13 @@ +using System; + +namespace Marten.Exceptions +{ + public class InvalidStreamOperationException: Exception + { + public InvalidStreamOperationException(string message): + base(message) + { + + } + } +}