Skip to content

Commit

Permalink
Prevent append operation on archived streams (#2244)
Browse files Browse the repository at this point in the history
* Prevent append operation on archived stream
* Add integration tests to ensure appending events to an archived stream is not allowed
  • Loading branch information
jabellard authored May 27, 2022
1 parent f01b4b7 commit ee1010a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/diagnostics.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSessi
// session to pipe Marten logging to the xUnit.Net output
theSession.Logger = new TestOutputMartenLogger(_output);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L226-L232' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_replacing_logger_per_session' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L227-L233' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_replacing_logger_per_session' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Previewing the PostgreSQL Query Plan
Expand Down
6 changes: 3 additions & 3 deletions docs/events/archiving.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public async Task SampleArchive(IDocumentSession session, string streamId)
await session.SaveChangesAsync();
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L26-L34' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_archive_stream_usage' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L27-L35' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_archive_stream_usage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

As in all cases with an `IDocumentSession`, you need to call `SaveChanges()` to commit the
Expand All @@ -35,7 +35,7 @@ var events = await theSession.Events
.Where(x => x.IsArchived)
.ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L161-L168' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_querying_for_archived_events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L162-L169' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_querying_for_archived_events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You can also query for all events both archived and not archived with `MaybeArchived()`
Expand All @@ -47,5 +47,5 @@ like so:
var events = await theSession.Events.QueryAllRawEvents()
.Where(x => x.MaybeArchived()).ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L192-L197' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_query_for_maybe_archived_events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L193-L198' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_query_for_maybe_archived_events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
40 changes: 40 additions & 0 deletions src/EventSourcingTests/archiving_events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Marten;
using Marten.Events;
using Marten.Events.Archiving;
using Marten.Exceptions;
using Marten.Testing.Harness;
using Shouldly;
using Weasel.Core;
Expand Down Expand Up @@ -237,5 +238,44 @@ public async Task query_by_a_specific_event_filters_out_archived_events_by_defau
events.All(x => x.Tracker != aEvent2.Tracker).ShouldBeTrue();
}

[Fact]
public async Task prevent_append_operation_for_archived_stream_on_sync_commit()
{
await theStore.Advanced.Clean.DeleteAllEventDataAsync();

var streamId = Guid.NewGuid();
theSession.Events.Append(streamId, new AEvent());
await theSession.SaveChangesAsync();

theSession.Events.ArchiveStream(streamId);
await theSession.SaveChangesAsync();

theSession.Events.Append(streamId, new BEvent());
var thrownException = Should.Throw<InvalidStreamOperationException>( () =>
{
theSession.SaveChanges();
});
thrownException.Message.ShouldBe($"Attempted to append event to archived stream with Id '{streamId}'.");
}

[Fact]
public async Task prevent_append_operation_for_archived_stream_on_async_commit()
{
await theStore.Advanced.Clean.DeleteAllEventDataAsync();

var streamId = Guid.NewGuid();
theSession.Events.Append(streamId, new AEvent());
await theSession.SaveChangesAsync();

theSession.Events.ArchiveStream(streamId);
await theSession.SaveChangesAsync();

theSession.Events.Append(streamId, new BEvent());
var thrownException = Should.Throw<InvalidStreamOperationException>( async () =>
{
await theSession.SaveChangesAsync();
});
thrownException.Message.ShouldBe($"Attempted to append event to archived stream with Id '{streamId}'.");
}
}
}
9 changes: 9 additions & 0 deletions src/Marten/Events/EventGraph.Processing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using Marten.Events.Operations;
using Marten.Exceptions;
using Marten.Internal;
using Marten.Internal.Operations;
using Marten.Internal.Sessions;
Expand Down Expand Up @@ -52,6 +53,10 @@ internal void ProcessEvents(DocumentSessionBase session)
}
else
{
if (state.IsArchived)
{
throw new InvalidStreamOperationException($"Attempted to append event to archived stream with Id '{state.Id}'.");
}
stream.PrepareEvents(state.Version, this, sequences, session);
session.QueueOperation(storage.UpdateStreamVersion(stream));
}
Expand Down Expand Up @@ -108,6 +113,10 @@ internal async Task ProcessEventsAsync(DocumentSessionBase session, Cancellation
}
else
{
if (state.IsArchived)
{
throw new InvalidStreamOperationException($"Attempted to append event to archived stream with Id '{state.Id}'.");
}
stream.PrepareEvents(state.Version, this, sequences, session);
session.QueueOperation(storage.UpdateStreamVersion(stream));
}
Expand Down
13 changes: 13 additions & 0 deletions src/Marten/Exceptions/InvalidStreamOperationException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;

namespace Marten.Exceptions
{
public class InvalidStreamOperationException: Exception
{
public InvalidStreamOperationException(string message):
base(message)
{

}
}
}

0 comments on commit ee1010a

Please sign in to comment.