Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Being able to specify where an event subscription starts #3146

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ const config: UserConfig<DefaultTheme.Config> = {
{ text: 'Rebuilding Projections', link: '/events/projections/rebuilding' },
{ text: 'Projections and IoC Services', link: '/events/projections/ioc' },
{ text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },]
},
{
text: 'Event Subscriptions',
link: '/events/subscriptions'
},

{
text: 'Event Versioning',
link: '/events/versioning'
Expand Down
3 changes: 0 additions & 3 deletions docs/events/streaming/index.md

This file was deleted.

3 changes: 0 additions & 3 deletions docs/events/streaming/sink.md

This file was deleted.

3 changes: 0 additions & 3 deletions docs/events/streaming/source.md

This file was deleted.

35 changes: 35 additions & 0 deletions docs/events/subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Event Subscriptions <Badge type="tip" text="7.7" />

Hey folks, there will be much more on this topic soon. Right now, it's in Marten > 7.6, but part of the core Marten team is
working with a client to prove out this feature. When that's done, we'll fill in the docs and sample code.

<!-- snippet: sample_ISubscription -->
<a id='snippet-sample_isubscription'></a>
```cs
/// <summary>
/// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in
/// order to do custom processing against an ordered stream of the events
/// </summary>
public interface ISubscription : IAsyncDisposable
{
/// <summary>
/// Processes a page of events at a time
/// </summary>
/// <param name="page"></param>
/// <param name="controller">Use to log dead letter events that are skipped or to stop the subscription from processing based on an exception</param>
/// <param name="operations">Access to Marten queries and writes that will be committed with the progress update for this subscription</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
IDocumentOperations operations,
CancellationToken cancellationToken);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/Subscriptions/ISubscription.cs#L9-L30' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_isubscription' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Registering Subscriptions

## Event Filtering

## Rewinding or Replaying Subscriptions
219 changes: 219 additions & 0 deletions src/Marten.AsyncDaemon.Testing/AsyncOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events.Daemon;
using Marten.Internal.Operations;
using Marten.Storage;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using NSubstitute;
using Shouldly;
using Xunit;

namespace Marten.AsyncDaemon.Testing;

public class AsyncOptionsTests
{
private readonly IMartenDatabase theDatabase = Substitute.For<IMartenDatabase>();
private readonly ShardName theName = new ShardName("Fake", "All");
private readonly CancellationToken theToken = CancellationToken.None;

[Fact]
public void teardown_by_view_type_1()
{
Expand All @@ -22,4 +32,213 @@ public void teardown_by_view_type_1()
operations.Received().QueueOperation(new TruncateTable(typeof(Target)));
operations.Received().QueueOperation(new TruncateTable(typeof(User)));
}

[Fact]
public async Task determine_starting_position_if_rebuild()
{
var options = new AsyncOptions();
(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken))
.ShouldBe(new Position(0, true));

}

[Fact]
public async Task determine_starting_position_if_continuous_and_no_other_constraints()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L);

var options = new AsyncOptions();
(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(111L, false));
}

[Fact]
public async Task subscribe_from_present()
{
var options = new AsyncOptions();
options.SubscribeFromPresent();

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(2000L, true));
}

[Fact]
public async Task do_not_match_on_database_name()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromPresent("Two");

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(111L, false));
}

[Fact]
public async Task do_match_on_database_name()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromPresent("One");

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(2000L, true));
}

[Fact]
public async Task subscribe_from_time_hit_with_no_prior()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(0);
theDatabase.Identifier.Returns("One");

var subscriptionTime = (DateTimeOffset)DateTime.Today;

theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L);

var options = new AsyncOptions();
options.SubscribeFromTime(subscriptionTime);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_time_miss_with_no_prior()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(0);
theDatabase.Identifier.Returns("One");

var subscriptionTime = (DateTimeOffset)DateTime.Today;

theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns((long?)null);

var options = new AsyncOptions();
options.SubscribeFromTime(subscriptionTime);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(0, false));
}

[Fact]
public async Task subscribe_from_time_hit_with_prior_lower_than_threshold()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(200L);
theDatabase.Identifier.Returns("One");

var subscriptionTime = (DateTimeOffset)DateTime.Today;

theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L);

var options = new AsyncOptions();
options.SubscribeFromTime(subscriptionTime);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_time_hit_with_prior_higher_than_threshold()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L);
theDatabase.Identifier.Returns("One");

var subscriptionTime = (DateTimeOffset)DateTime.Today;

theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L);

var options = new AsyncOptions();
options.SubscribeFromTime(subscriptionTime);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(500L, false));
}

[Fact]
public async Task subscribe_from_time_hit_with_prior_higher_than_threshold_and_rebuild()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L);
theDatabase.Identifier.Returns("One");

var subscriptionTime = (DateTimeOffset)DateTime.Today;

theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L);

var options = new AsyncOptions();
options.SubscribeFromTime(subscriptionTime);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_sequence_hit_with_no_prior()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(100);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromSequence(222L);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_sequence_miss_with_no_prior()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(0);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromSequence(222L);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_sequence_hit_with_prior_lower_than_threshold()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(200L);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromSequence(222L);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromSequence(222L);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(500L, false));
}

[Fact]
public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold_and_rebuild()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L);
theDatabase.Identifier.Returns("One");

var options = new AsyncOptions();
options.SubscribeFromSequence(222L);

(await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken))
.ShouldBe(new Position(222L, true));
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,42 @@ public async Task run_events_through()
progress.ShouldBe(16);
}

[Fact]
public async Task start_subscription_at_sequence_floor()
{
using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

var events1 = new object[] { new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events2 = new object[] { new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events3 = new object[] { new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events4 = new object[] { new EventSourcingTests.Aggregation.EEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };

theSession.Events.StartStream(Guid.NewGuid(), events1);
theSession.Events.StartStream(Guid.NewGuid(), events2);
theSession.Events.StartStream(Guid.NewGuid(), events3);
theSession.Events.StartStream(Guid.NewGuid(), events4);

await theSession.SaveChangesAsync();

await theStore.WaitForNonStaleProjectionDataAsync(20.Seconds());

var secondSubscription = new FakeSubscription();
var secondStore = SeparateStore(opts =>
{
opts.Projections.Subscribe(secondSubscription, o => o.Options.SubscribeFromSequence(8));
});

await daemon.StopAllAsync();

using var daemon2 = await secondStore.BuildProjectionDaemonAsync();
await daemon2.StartAllAsync();
await Task.Delay(2000); // yeah, I know
await secondStore.WaitForNonStaleProjectionDataAsync(20.Seconds());

secondSubscription.EventsEncountered.Count.ShouldBe(8);
}

[Fact]
public async Task run_events_through_and_rewind_from_scratch()
{
Expand Down
Loading
Loading