From 482572a56d07c8b99619f81756909072e46de29a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20K=C3=BCsters?= Date: Tue, 12 Dec 2023 10:01:34 +0100 Subject: [PATCH 1/2] Switched to MaxAutoLockRenewalDuration --- ...zureServiceBusCommandSessionProcessor`1.cs | 42 +------------------ .../Setup/AzureServiceBusSetupEnvironment.cs | 11 +++-- 2 files changed, 7 insertions(+), 46 deletions(-) diff --git a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs index 574fadf..6fde071 100644 --- a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs +++ b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs @@ -27,12 +27,10 @@ public class AzureServiceBusCommandSessionProcessor : IAzureServiceBus public bool IsAlive => _isStarted && !_serviceBusSessionProcessor.IsClosed; private readonly string _handleMessageActivityName; - private readonly TimeSpan _renewSessionLockInterval; public AzureServiceBusCommandSessionProcessor( ServiceBusSessionProcessor serviceBusSessionProcessor, - IServiceCollection serviceCollection, - TimeSpan renewSessionLockInterval) + IServiceCollection serviceCollection) { _serviceBusSessionProcessor = serviceBusSessionProcessor; _serviceCollection = serviceCollection; @@ -46,7 +44,6 @@ public AzureServiceBusCommandSessionProcessor( .BuildServiceProvider() .GetRequiredService(); _handleMessageActivityName = $"HandleMessageOf{typeof(TCommand).Name}"; - _renewSessionLockInterval = renewSessionLockInterval; } public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg) @@ -87,41 +84,11 @@ public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg) var scopeFactory = services.BuildServiceProvider().GetRequiredService(); var scope = scopeFactory.CreateScope(); - var renewSessionLockCancellationTokenSource = new CancellationTokenSource(); - var renewSessionLockCancellationToken = renewSessionLockCancellationTokenSource.Token; - Task? renewSessionLockTask = null; try { var scheduledCommandRunner = scope.ServiceProvider.GetRequiredService>(); - var scheduledCommandRunnerTask = scheduledCommandRunner.RunAsync(scheduledCommand); - - // renew session lock every 30 seconds - renewSessionLockTask = Task.Run( - async () => - { - while (!renewSessionLockCancellationToken.IsCancellationRequested && - !scheduledCommandRunnerTask.IsCompleted) - { - await Task.Delay(_renewSessionLockInterval, renewSessionLockCancellationToken); - Console.WriteLine($"Renewing session lock for session{arg.SessionId}..."); - try - { - await arg.RenewSessionLockAsync(renewSessionLockCancellationToken); - Console.WriteLine($"Renewed session lock for session {arg.SessionId}"); - } - catch (Exception e) - { - Console.WriteLine($"Failed to renew session lock for session {arg.SessionId}"); - Console.WriteLine(e); - throw; - } - } - }, - renewSessionLockCancellationTokenSource.Token); - - await scheduledCommandRunnerTask; - renewSessionLockCancellationTokenSource.Cancel(); + await scheduledCommandRunner.RunAsync(scheduledCommand); } catch (Exception e) { @@ -129,11 +96,6 @@ public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg) Console.WriteLine(e); throw; } - finally - { - renewSessionLockCancellationTokenSource.Dispose(); - renewSessionLockTask?.Dispose(); - } } public async Task StartAsync(CancellationToken cancellationToken) diff --git a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs index 1f1938f..ff93b8f 100644 --- a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs +++ b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.DependencyInjection; using Wemogy.Core.Errors; @@ -56,12 +57,10 @@ public AzureServiceBusSetupEnvironment AddDelayedProcessor( /// The maximum number of concurrent sessions (default 1) /// The maximum number of concurrent calls per session (default 1) /// Optional custom configuration of the ServiceBusSessionProcessorOptions - /// The interval to renew the session lock (default 1 minute) public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor( int maxConcurrentSessions = 1, int maxConcurrentCallsPerSession = 1, - Action? configureSessionProcessorOptions = null, - TimeSpan? renewSessionLockInterval = null) + Action? configureSessionProcessorOptions = null) where TCommand : ICommandBase { var queueName = GetQueueName(); @@ -79,7 +78,8 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor( { MaxConcurrentSessions = maxConcurrentSessions, MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession, - SessionIdleTimeout = TimeSpan.FromSeconds(2) + SessionIdleTimeout = TimeSpan.FromSeconds(2), + MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan }; configureSessionProcessorOptions?.Invoke(serviceBusSessionProcessorOptions); @@ -89,8 +89,7 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor( serviceBusSessionProcessorOptions); var processor = new AzureServiceBusCommandSessionProcessor( serviceBusSessionProcessor, - _serviceCollection, - renewSessionLockInterval ?? TimeSpan.FromMinutes(1)); + _serviceCollection); return processor; }); From f2ac85b827f1fcc29d6237f18767ba0c99546781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20K=C3=BCsters?= Date: Tue, 12 Dec 2023 10:09:51 +0100 Subject: [PATCH 2/2] Updated Azure.Messaging.ServiceBus --- .../Wemogy.CQRS.Extensions.AzureServiceBus.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.csproj b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.csproj index 163f56a..225fee4 100644 --- a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.csproj +++ b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.csproj @@ -14,7 +14,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all