diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index f84f0b789f..0ede3a8db5 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -181,8 +181,11 @@ const config: UserConfig = { { text: 'Rebuilding Projections', link: '/events/projections/rebuilding' }, { text: 'Projections and IoC Services', link: '/events/projections/ioc' }, { text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },] + }, + { + text: 'Event Subscriptions', + link: '/events/subscriptions' }, - { text: 'Event Versioning', link: '/events/versioning' diff --git a/docs/events/streaming/index.md b/docs/events/streaming/index.md deleted file mode 100644 index 10be6e4b09..0000000000 --- a/docs/events/streaming/index.md +++ /dev/null @@ -1,3 +0,0 @@ -# TODO - -Add general introduction to the streaming/queuing concept and locate Marten inside that. diff --git a/docs/events/streaming/sink.md b/docs/events/streaming/sink.md deleted file mode 100644 index c9a0135af8..0000000000 --- a/docs/events/streaming/sink.md +++ /dev/null @@ -1,3 +0,0 @@ -# TODO - -Explain how to put data from queues to Marten diff --git a/docs/events/streaming/source.md b/docs/events/streaming/source.md deleted file mode 100644 index 2884a00cf5..0000000000 --- a/docs/events/streaming/source.md +++ /dev/null @@ -1,3 +0,0 @@ -# TODO - -Explain how to put data to queues to Marten diff --git a/docs/events/subscriptions.md b/docs/events/subscriptions.md new file mode 100644 index 0000000000..b71e2ca2f6 --- /dev/null +++ b/docs/events/subscriptions.md @@ -0,0 +1,35 @@ +# Event Subscriptions + +Hey folks, there will be much more on this topic soon. Right now, it's in Marten > 7.6, but part of the core Marten team is +working with a client to prove out this feature. When that's done, we'll fill in the docs and sample code. + + + +```cs +/// +/// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in +/// order to do custom processing against an ordered stream of the events +/// +public interface ISubscription : IAsyncDisposable +{ + /// + /// Processes a page of events at a time + /// + /// + /// Use to log dead letter events that are skipped or to stop the subscription from processing based on an exception + /// Access to Marten queries and writes that will be committed with the progress update for this subscription + /// + /// + Task ProcessEventsAsync(EventRange page, ISubscriptionController controller, + IDocumentOperations operations, + CancellationToken cancellationToken); +} +``` +snippet source | anchor + + +## Registering Subscriptions + +## Event Filtering + +## Rewinding or Replaying Subscriptions diff --git a/src/Marten.AsyncDaemon.Testing/AsyncOptionsTests.cs b/src/Marten.AsyncDaemon.Testing/AsyncOptionsTests.cs index 4f843b7941..4336583b4a 100644 --- a/src/Marten.AsyncDaemon.Testing/AsyncOptionsTests.cs +++ b/src/Marten.AsyncDaemon.Testing/AsyncOptionsTests.cs @@ -1,13 +1,23 @@ +using System; +using System.Threading; +using System.Threading.Tasks; using Marten.Events.Daemon; using Marten.Internal.Operations; +using Marten.Storage; using Marten.Testing.Documents; +using Marten.Testing.Harness; using NSubstitute; +using Shouldly; using Xunit; namespace Marten.AsyncDaemon.Testing; public class AsyncOptionsTests { + private readonly IMartenDatabase theDatabase = Substitute.For(); + private readonly ShardName theName = new ShardName("Fake", "All"); + private readonly CancellationToken theToken = CancellationToken.None; + [Fact] public void teardown_by_view_type_1() { @@ -22,4 +32,213 @@ public void teardown_by_view_type_1() operations.Received().QueueOperation(new TruncateTable(typeof(Target))); operations.Received().QueueOperation(new TruncateTable(typeof(User))); } + + [Fact] + public async Task determine_starting_position_if_rebuild() + { + var options = new AsyncOptions(); + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken)) + .ShouldBe(new Position(0, true)); + + } + + [Fact] + public async Task determine_starting_position_if_continuous_and_no_other_constraints() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L); + + var options = new AsyncOptions(); + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(111L, false)); + } + + [Fact] + public async Task subscribe_from_present() + { + var options = new AsyncOptions(); + options.SubscribeFromPresent(); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(2000L, true)); + } + + [Fact] + public async Task do_not_match_on_database_name() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromPresent("Two"); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(111L, false)); + } + + [Fact] + public async Task do_match_on_database_name() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(111L); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromPresent("One"); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(2000L, true)); + } + + [Fact] + public async Task subscribe_from_time_hit_with_no_prior() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(0); + theDatabase.Identifier.Returns("One"); + + var subscriptionTime = (DateTimeOffset)DateTime.Today; + + theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L); + + var options = new AsyncOptions(); + options.SubscribeFromTime(subscriptionTime); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_time_miss_with_no_prior() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(0); + theDatabase.Identifier.Returns("One"); + + var subscriptionTime = (DateTimeOffset)DateTime.Today; + + theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns((long?)null); + + var options = new AsyncOptions(); + options.SubscribeFromTime(subscriptionTime); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(0, false)); + } + + [Fact] + public async Task subscribe_from_time_hit_with_prior_lower_than_threshold() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(200L); + theDatabase.Identifier.Returns("One"); + + var subscriptionTime = (DateTimeOffset)DateTime.Today; + + theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L); + + var options = new AsyncOptions(); + options.SubscribeFromTime(subscriptionTime); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_time_hit_with_prior_higher_than_threshold() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L); + theDatabase.Identifier.Returns("One"); + + var subscriptionTime = (DateTimeOffset)DateTime.Today; + + theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L); + + var options = new AsyncOptions(); + options.SubscribeFromTime(subscriptionTime); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(500L, false)); + } + + [Fact] + public async Task subscribe_from_time_hit_with_prior_higher_than_threshold_and_rebuild() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L); + theDatabase.Identifier.Returns("One"); + + var subscriptionTime = (DateTimeOffset)DateTime.Today; + + theDatabase.FindEventStoreFloorAtTimeAsync(subscriptionTime, theToken).Returns(222L); + + var options = new AsyncOptions(); + options.SubscribeFromTime(subscriptionTime); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_sequence_hit_with_no_prior() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(100); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromSequence(222L); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_sequence_miss_with_no_prior() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(0); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromSequence(222L); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_sequence_hit_with_prior_lower_than_threshold() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(200L); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromSequence(222L); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + [Fact] + public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromSequence(222L); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken)) + .ShouldBe(new Position(500L, false)); + } + + [Fact] + public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold_and_rebuild() + { + theDatabase.ProjectionProgressFor(theName, theToken).Returns(500L); + theDatabase.Identifier.Returns("One"); + + var options = new AsyncOptions(); + options.SubscribeFromSequence(222L); + + (await options.DetermineStartingPositionAsync(2000L, theName, ShardExecutionMode.Rebuild, theDatabase, theToken)) + .ShouldBe(new Position(222L, true)); + } + + + + } diff --git a/src/Marten.AsyncDaemon.Testing/Subscriptions/subscriptions_end_to_end.cs b/src/Marten.AsyncDaemon.Testing/Subscriptions/subscriptions_end_to_end.cs index 27e010c632..020236136b 100644 --- a/src/Marten.AsyncDaemon.Testing/Subscriptions/subscriptions_end_to_end.cs +++ b/src/Marten.AsyncDaemon.Testing/Subscriptions/subscriptions_end_to_end.cs @@ -58,6 +58,42 @@ public async Task run_events_through() progress.ShouldBe(16); } + [Fact] + public async Task start_subscription_at_sequence_floor() + { + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + var events1 = new object[] { new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() }; + var events2 = new object[] { new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() }; + var events3 = new object[] { new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() }; + var events4 = new object[] { new EventSourcingTests.Aggregation.EEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() }; + + theSession.Events.StartStream(Guid.NewGuid(), events1); + theSession.Events.StartStream(Guid.NewGuid(), events2); + theSession.Events.StartStream(Guid.NewGuid(), events3); + theSession.Events.StartStream(Guid.NewGuid(), events4); + + await theSession.SaveChangesAsync(); + + await theStore.WaitForNonStaleProjectionDataAsync(20.Seconds()); + + var secondSubscription = new FakeSubscription(); + var secondStore = SeparateStore(opts => + { + opts.Projections.Subscribe(secondSubscription, o => o.Options.SubscribeFromSequence(8)); + }); + + await daemon.StopAllAsync(); + + using var daemon2 = await secondStore.BuildProjectionDaemonAsync(); + await daemon2.StartAllAsync(); + await Task.Delay(2000); // yeah, I know + await secondStore.WaitForNonStaleProjectionDataAsync(20.Seconds()); + + secondSubscription.EventsEncountered.Count.ShouldBe(8); + } + [Fact] public async Task run_events_through_and_rewind_from_scratch() { diff --git a/src/Marten/Events/Daemon/AsyncOptions.cs b/src/Marten/Events/Daemon/AsyncOptions.cs index 4216202cb3..983ca0412f 100644 --- a/src/Marten/Events/Daemon/AsyncOptions.cs +++ b/src/Marten/Events/Daemon/AsyncOptions.cs @@ -1,8 +1,15 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; using Marten.Internal.Operations; +using Marten.Storage; using Weasel.Core; +#nullable enable + namespace Marten.Events.Daemon; /// @@ -83,4 +90,148 @@ internal void Teardown(IDocumentOperations operations) { foreach (var action in _actions) action(operations); } + + internal Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, ShardExecutionMode mode, + IMartenDatabase database, + CancellationToken token) + { + var strategy = matchStrategy(database); + return strategy.DetermineStartingPositionAsync(highWaterMark, name, mode, database, token); + } + + private IPositionStrategy matchStrategy(IMartenDatabase database) + { + return _strategies.Where(x => x.DatabaseName.IsNotEmpty()).FirstOrDefault(x => x.DatabaseName!.EqualsIgnoreCase(database.Identifier)) + ?? _strategies.FirstOrDefault(x => x.DatabaseName.IsEmpty()) ?? CatchUp.Instance; + } + + private readonly List _strategies = new(); + + /// + /// Direct that this subscription or projection should only start from events that are appended + /// after the subscription is started + /// + /// Optionally applies this rule to *only* the named database in the case of + /// using a multi-tenancy per multiple databases strategy + /// + public AsyncOptions SubscribeFromPresent(string? databaseIdentifier = null) + { + _strategies.Add(new FromPresent(databaseIdentifier)); + return this; + } + + /// + /// Direct that this subscription or projection should only start from events that have a timestamp + /// greater than the supplied eventTimestampFloor + /// + /// The floor time of the events where this subscription should be started + /// Optionally applies this rule to *only* the named database in the case of + /// using a multi-tenancy per multiple databases strategy + /// + public AsyncOptions SubscribeFromTime(DateTimeOffset eventTimestampFloor, string? databaseIdentifier = null) + { + _strategies.Add(new FromTime(databaseIdentifier, eventTimestampFloor)); + return this; + } + + /// + /// Direct that this subscription or projection should only start from events that have a sequence + /// greater than the supplied sequenceFloor + /// + /// + /// Optionally applies this rule to *only* the named database in the case of + /// using a multi-tenancy per multiple databases strategy + /// + public AsyncOptions SubscribeFromSequence(long sequenceFloor, string? databaseIdentifier = null) + { + _strategies.Add(new FromSequence(databaseIdentifier, sequenceFloor)); + return this; + } +} + +internal record Position(long Floor, bool ShouldUpdateProgressFirst); + +internal interface IPositionStrategy +{ + string? DatabaseName { get;} + + Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, ShardExecutionMode mode, + IMartenDatabase database, + CancellationToken token); +} + +internal class FromSequence(string? databaseName, long sequence): IPositionStrategy +{ + public string? DatabaseName { get; } = databaseName; + public long Sequence { get; } = sequence; + + public async Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, + ShardExecutionMode mode, + IMartenDatabase database, CancellationToken token) + { + if (mode == ShardExecutionMode.Rebuild) + { + return new Position(Sequence, true); + } + + var current = await database.ProjectionProgressFor(name, token).ConfigureAwait(false); + + return current >= Sequence + ? new Position(current, false) + : new Position(Sequence, true); + } +} + +internal class FromTime(string? databaseName, DateTimeOffset time): IPositionStrategy +{ + public string? DatabaseName { get; } = databaseName; + public DateTimeOffset EventFloorTime { get; } = time; + + public async Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, + ShardExecutionMode mode, + IMartenDatabase database, CancellationToken token) + { + var floor = await database.FindEventStoreFloorAtTimeAsync(EventFloorTime, token).ConfigureAwait(false) ?? 0; + + if (mode == ShardExecutionMode.Rebuild) + { + return new Position(floor, true); + } + + var current = await database.ProjectionProgressFor(name, token).ConfigureAwait(false); + + return current >= floor ? new Position(current, false) : new Position(floor, true); + } +} + +internal class FromPresent(string? databaseName): IPositionStrategy +{ + public string? DatabaseName { get; } = databaseName; + + public Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, ShardExecutionMode mode, + IMartenDatabase database, CancellationToken token) + { + return Task.FromResult(new Position(highWaterMark, true)); + } +} + +internal class CatchUp: IPositionStrategy +{ + internal static CatchUp Instance = new(); + + private CatchUp(){} + + public string? DatabaseName { get; set; } = null; + + public async Task DetermineStartingPositionAsync(long highWaterMark, ShardName name, + ShardExecutionMode mode, + IMartenDatabase database, + CancellationToken token) + { + return mode == ShardExecutionMode.Continuous + ? new Position(await database.ProjectionProgressFor(name, token).ConfigureAwait(false), false) + + // No point in doing the extra database hop + : new Position(0, true); + } } diff --git a/src/Marten/Events/Daemon/ISubscriptionAgent.cs b/src/Marten/Events/Daemon/ISubscriptionAgent.cs index 6fea953a9d..78796d1ac9 100644 --- a/src/Marten/Events/Daemon/ISubscriptionAgent.cs +++ b/src/Marten/Events/Daemon/ISubscriptionAgent.cs @@ -75,5 +75,6 @@ public interface ISubscriptionAgent: ISubscriptionController Task RecordDeadLetterEventAsync(DeadLetterEvent @event); DateTimeOffset? PausedTime { get; } + AsyncOptions Options { get; } Task ReplayAsync(SubscriptionExecutionRequest request, long highWaterMark, TimeSpan timeout); } diff --git a/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs b/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs index bcd970aea2..41291ebc82 100644 --- a/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs +++ b/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs @@ -11,7 +11,6 @@ namespace Marten.Events.Daemon.Internals; public class SubscriptionAgent: ISubscriptionAgent, IAsyncDisposable { - private readonly AsyncOptions _options; private readonly IEventLoader _loader; private readonly ISubscriptionExecution _execution; private readonly ShardStateTracker _tracker; @@ -24,7 +23,7 @@ public class SubscriptionAgent: ISubscriptionAgent, IAsyncDisposable public SubscriptionAgent(ShardName name, AsyncOptions options, IEventLoader loader, ISubscriptionExecution execution, ShardStateTracker tracker, ILogger logger) { - _options = options; + Options = options; _loader = loader; _execution = execution; _tracker = tracker; @@ -40,6 +39,8 @@ public SubscriptionAgent(ShardName name, AsyncOptions options, IEventLoader load } } + public AsyncOptions Options { get; } + public string ProjectionShardIdentity { get; private set; } public CancellationToken CancellationToken => _cancellation.Token; @@ -248,7 +249,7 @@ internal async Task Apply(Command command) var inflight = LastEnqueued - LastCommitted; // Back pressure, slow down - if (inflight >= _options.MaximumHopperSize) return; + if (inflight >= Options.MaximumHopperSize) return; // If all caught up, do nothing! // Not sure how either of these numbers could actually be higher than @@ -257,7 +258,7 @@ internal async Task Apply(Command command) if (LastEnqueued >= HighWaterMark) return; // You could maybe get a full size batch, so go get the next - if (HighWaterMark - LastEnqueued > _options.BatchSize) + if (HighWaterMark - LastEnqueued > Options.BatchSize) { await loadNextAsync().ConfigureAwait(false); } @@ -265,7 +266,7 @@ internal async Task Apply(Command command) { // If the execution is busy, let's let events accumulate a little // more - var twoBatchSize = 2 * _options.BatchSize; + var twoBatchSize = 2 * Options.BatchSize; if (inflight < twoBatchSize) { await loadNextAsync().ConfigureAwait(false); @@ -278,7 +279,7 @@ private async Task loadNextAsync() var request = new EventRequest { HighWater = HighWaterMark, - BatchSize = _options.BatchSize, + BatchSize = Options.BatchSize, Floor = LastEnqueued, ErrorOptions = ErrorOptions, Runtime = _runtime, diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs b/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs index 43510dcb3e..90b9aa86af 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs @@ -217,6 +217,36 @@ public async Task RewindSubscriptionAsync(string subscriptionName, CancellationT var agents = _factory.BuildAgents(subscriptionName, Database); + await rewindSubscriptionProgress(subscriptionName, token, sequenceFloor, agents).ConfigureAwait(false); + + foreach (var agent in agents) + { + Tracker.MarkAsRestarted(agent.Name); + var errorOptions = _store.Options.Projections.RebuildErrors; + await agent.StartAsync(new SubscriptionExecutionRequest(sequenceFloor.Value, ShardExecutionMode.Continuous, + errorOptions, this)).ConfigureAwait(false); + agent.MarkHighWater(HighWaterMark()); + } + } + + private async Task rewindAgentProgress(string shardName, CancellationToken token, long sequenceFloor) + { + var sessionOptions = SessionOptions.ForDatabase(Database); + sessionOptions.AllowAnyTenant = true; + await using var session = _store.LightweightSession(sessionOptions); + + session.QueueSqlCommand($"delete from {_store.Options.EventGraph.ProgressionTable} where name = ?", shardName); + if (sequenceFloor > 0) + { + session.QueueSqlCommand($"update {_store.Options.EventGraph.ProgressionTable} set last_seq_id = ? where name = ?", sequenceFloor, shardName); + } + + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + private async Task rewindSubscriptionProgress(string subscriptionName, CancellationToken token, long? sequenceFloor, + IReadOnlyList agents) + { var sessionOptions = SessionOptions.ForDatabase(Database); sessionOptions.AllowAnyTenant = true; await using var session = _store.LightweightSession(sessionOptions); @@ -237,14 +267,5 @@ public async Task RewindSubscriptionAsync(string subscriptionName, CancellationT session.DeleteWhere(x => x.ProjectionName == subscriptionName && x.EventSequence >= sequenceFloor); await session.SaveChangesAsync(token).ConfigureAwait(false); - - foreach (var agent in agents) - { - Tracker.MarkAsRestarted(agent.Name); - var errorOptions = _store.Options.Projections.RebuildErrors; - await agent.StartAsync(new SubscriptionExecutionRequest(sequenceFloor.Value, ShardExecutionMode.Continuous, - errorOptions, this)).ConfigureAwait(false); - agent.MarkHighWater(HighWaterMark()); - } } } diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.cs b/src/Marten/Events/Daemon/ProjectionDaemon.cs index 1703fe0959..86d4cb532a 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.cs @@ -94,18 +94,22 @@ private async Task tryStartAgentAsync(ISubscriptionAgent agent, ShardExecu return false; } - var position = mode == ShardExecutionMode.Continuous - ? await Database.ProjectionProgressFor(agent.Name, _cancellation.Token).ConfigureAwait(false) + var highWaterMark = HighWaterMark(); + var position = await agent + .Options + .DetermineStartingPositionAsync(highWaterMark, agent.Name, mode, Database, _cancellation.Token).ConfigureAwait(false); - // No point in doing the extra database hop - : 0; + if (position.ShouldUpdateProgressFirst) + { + await rewindAgentProgress(agent.Name.Identity, _cancellation.Token, position.Floor).ConfigureAwait(false); + } var errorOptions = mode == ShardExecutionMode.Continuous ? _store.Options.Projections.Errors : _store.Options.Projections.RebuildErrors; - await agent.StartAsync(new SubscriptionExecutionRequest(position, mode, errorOptions, this)).ConfigureAwait(false); - agent.MarkHighWater(HighWaterMark()); + await agent.StartAsync(new SubscriptionExecutionRequest(position.Floor, mode, errorOptions, this)).ConfigureAwait(false); + agent.MarkHighWater(highWaterMark); _agents = _agents.AddOrUpdate(agent.Name.Identity, agent); } diff --git a/src/Marten/Storage/IMartenDatabase.cs b/src/Marten/Storage/IMartenDatabase.cs index 14b8e3297c..377c229e2e 100644 --- a/src/Marten/Storage/IMartenDatabase.cs +++ b/src/Marten/Storage/IMartenDatabase.cs @@ -110,4 +110,12 @@ Task ProjectionProgressFor(ShardName name, CancellationToken token = default); + /// + /// Find the position of the event store sequence just below the supplied timestamp. Will + /// return null if there are no events below that time threshold + /// + /// + /// + /// + Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token); } diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs index 123d28d05f..5c8b2924f7 100644 --- a/src/Marten/Storage/StandinDatabase.cs +++ b/src/Marten/Storage/StandinDatabase.cs @@ -223,6 +223,11 @@ public Task ProjectionProgressFor(ShardName name, CancellationToken token throw new NotImplementedException(); } + public async Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token) + { + throw new NotImplementedException(); + } + public void Dispose() { ((IDisposable)Tracker)?.Dispose(); diff --git a/src/Marten/Subscriptions/ISubscription.cs b/src/Marten/Subscriptions/ISubscription.cs index 407bd0e0a0..0facbacff7 100644 --- a/src/Marten/Subscriptions/ISubscription.cs +++ b/src/Marten/Subscriptions/ISubscription.cs @@ -6,6 +6,8 @@ namespace Marten.Subscriptions; +#region sample_ISubscription + /// /// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in /// order to do custom processing against an ordered stream of the events @@ -24,3 +26,5 @@ Task ProcessEventsAsync(EventRange page, ISubscriptionControlle IDocumentOperations operations, CancellationToken cancellationToken); } + +#endregion