diff --git a/src/Marten.CommandLine.Tests/ProjectionControllerTests.cs b/src/Marten.CommandLine.Tests/ProjectionControllerTests.cs index 2034d29b88..838efe0a8e 100644 --- a/src/Marten.CommandLine.Tests/ProjectionControllerTests.cs +++ b/src/Marten.CommandLine.Tests/ProjectionControllerTests.cs @@ -1,3 +1,4 @@ +using Baseline.Dates; using Castle.Core; using Marten.CommandLine.Commands.Projection; using Marten.Events.Daemon; @@ -64,7 +65,7 @@ void IProjectionHost.ListenForUserTriggeredExit() protected readonly List rebuilt = new (); - Task IProjectionHost.TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards) + Task IProjectionHost.TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards, TimeSpan? shardTimeout) { foreach (var shard in asyncProjectionShards) { @@ -327,6 +328,28 @@ public async Task rebuilds_all_databases_for_a_single_store() rebuilt.ShouldHaveTheSameElementsAs(expectedRebuilds); } + [Fact] + public async Task rebuilds_all_databases_for_a_single_store_with_shard_timeout() + { + var store = withStore("Marten", new("Foo:All", ProjectionLifecycle.Async), + new("Bar:All", ProjectionLifecycle.Inline)); + + var shards = store.Shards; + + var databases = store.HasDatabases("One", "Two", "Three"); + + // NOTE: ShardTimeout is set in ProjectInput as a test + // but there is no means to assert this value being used by daemon + await theController.Execute(new ProjectionInput { RebuildFlag = true, ShardTimeout = 10.Minutes()}); + + var expectedRebuilds = databases.SelectMany(db => + { + return shards.Select(shard => new RebuildRecord(store, db, shard)); + }).ToArray(); + + rebuilt.ShouldHaveTheSameElementsAs(expectedRebuilds); + } + [Fact] public async Task rebuild_database_that_is_empty() { diff --git a/src/Marten.CommandLine/Commands/Projection/IProjectionHost.cs b/src/Marten.CommandLine/Commands/Projection/IProjectionHost.cs index 58157838ce..b0fe3c4791 100644 --- a/src/Marten.CommandLine/Commands/Projection/IProjectionHost.cs +++ b/src/Marten.CommandLine/Commands/Projection/IProjectionHost.cs @@ -1,5 +1,7 @@ +using System; using System.Collections.Generic; using System.Threading.Tasks; +using Baseline.Dates; using Marten.Events.Daemon; namespace Marten.CommandLine.Commands.Projection; @@ -14,7 +16,7 @@ public interface IProjectionHost { IReadOnlyList AllStores(); void ListenForUserTriggeredExit(); - Task TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards); + Task TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards, TimeSpan? shardTimeout=null); Task StartShards(IProjectionDatabase database, IReadOnlyList shards); Task WaitForExit(); -} \ No newline at end of file +} diff --git a/src/Marten.CommandLine/Commands/Projection/ProjectionController.cs b/src/Marten.CommandLine/Commands/Projection/ProjectionController.cs index dbf40b754c..2547a63520 100644 --- a/src/Marten.CommandLine/Commands/Projection/ProjectionController.cs +++ b/src/Marten.CommandLine/Commands/Projection/ProjectionController.cs @@ -78,7 +78,7 @@ public async Task Execute(ProjectionInput input) _view.WriteHeader(database); } - var status = await _host.TryRebuildShards(database, shards).ConfigureAwait(false); + var status = await _host.TryRebuildShards(database, shards, input.ShardTimeout).ConfigureAwait(false); if (status == RebuildStatus.NoData) { @@ -175,4 +175,4 @@ public IReadOnlyList FilterDatabases(ProjectionInput input, return databases; } -} \ No newline at end of file +} diff --git a/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs b/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs index 4c10203b8b..7f4646300f 100644 --- a/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs +++ b/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs @@ -5,6 +5,7 @@ using System.Runtime.Loader; using System.Threading; using System.Threading.Tasks; +using Baseline.Dates; using Marten.Events.Daemon; using Microsoft.Extensions.Hosting; @@ -57,7 +58,7 @@ public void Shutdown() _completion.TrySetResult(true); } - public async Task TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards) + public async Task TryRebuildShards(IProjectionDatabase database, IReadOnlyList asyncProjectionShards, TimeSpan? shardTimeout=null) { using var daemon = database.BuildDaemon(); await daemon.StartDaemon().ConfigureAwait(false); @@ -80,13 +81,31 @@ public async Task TryRebuildShards(IProjectionDatabase database, #if NET6_0_OR_GREATER await Parallel.ForEachAsync(projectionNames, _cancellation.Token, - async (projectionName, token) => - await daemon.RebuildProjection(projectionName, token).ConfigureAwait(false)) - .ConfigureAwait(false); + async (projectionName, token) => + { + if (shardTimeout == null) + { + await daemon.RebuildProjection(projectionName, token).ConfigureAwait(true); + } + else + { + await daemon.RebuildProjection(projectionName, shardTimeout.Value, token).ConfigureAwait(true); + } + }) + .ConfigureAwait(false); #else var tasks = projectionNames - .Select(x => Task.Run(async () => await daemon.RebuildProjection(x, _cancellation.Token).ConfigureAwait(false), _cancellation.Token)) + .Select(x => Task.Run(async () => { + if (shardTimeout == null) + { + await daemon.RebuildProjection(x, _cancellation.Token).ConfigureAwait(false); + } + else + { + await daemon.RebuildProjection(x, shardTimeout.Value, _cancellation.Token).ConfigureAwait(false); + } + }, _cancellation.Token)) .ToArray(); await Task.WhenAll(tasks).ConfigureAwait(false); @@ -116,4 +135,4 @@ public Task WaitForExit() { return _completion.Task; } -} \ No newline at end of file +} diff --git a/src/Marten.CommandLine/Commands/Projection/ProjectionInput.cs b/src/Marten.CommandLine/Commands/Projection/ProjectionInput.cs index 549da3437b..e029dba5e6 100644 --- a/src/Marten.CommandLine/Commands/Projection/ProjectionInput.cs +++ b/src/Marten.CommandLine/Commands/Projection/ProjectionInput.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Linq; using Baseline; @@ -33,6 +34,9 @@ public ProjectionInput() [Description("If specified, only execute against the named Marten database within the specified store(s). Does not apply with only one store")] public string DatabaseFlag { get; set; } + [Description("If specified, use this shard timeout value for daemon")] + public TimeSpan? ShardTimeout { get; set; } + internal IList BuildShards(DocumentStore store) { var projections = store @@ -89,4 +93,4 @@ internal IList SelectProjectionsForRebuild(DocumentStore stor return projections; } -} \ No newline at end of file +}