From 40a56d2f388ae26f9effc3892d98ddf7df5364ae Mon Sep 17 00:00:00 2001 From: Mahmoud Samir Date: Mon, 2 Dec 2024 08:41:44 +0300 Subject: [PATCH] Invoke consumer-defined action when exceptions raised during consumption Redis streams (#1618) * #1611 Enhanced logging and error handling in RedisConsumer Added new unit tests for various functionalities and configurations. Improved code formatting and consistency across the project. * update summary * OnConsumeError Updates update CapRedisOptions with OnConsumeError option --- CAP.sln | 6 + .../user-guide/en/transport/redis-streams.md | 2 +- .../Controllers/HomeController.cs | 75 ++++++----- samples/Samples.Redis.SqlServer/Dockerfile | 21 ++-- samples/Samples.Redis.SqlServer/Program.cs | 58 ++++++--- .../Samples.Redis.SqlServer.csproj | 33 ++--- samples/Samples.Redis.SqlServer/Startup.cs | 37 ------ .../Samples.Redis.SqlServer/appsettings.json | 2 +- .../docker-compose.yml | 24 ++-- .../CapOptions.Redis.Extensions.cs | 2 +- .../CapOptions.Redis.cs | 8 ++ .../DotNetCore.CAP.RedisStreams.csproj | 4 +- .../ICapOptionsExtension.Redis.cs | 9 +- .../IConnectionPool.Default.cs | 10 +- .../IConnectionPool.LazyConnection.cs | 33 ++--- .../IConsumerClient.Redis.cs | 55 ++++---- .../IConsumerClientFactory.Redis.cs | 17 +-- .../IRedis.Events.cs | 22 ++-- .../IRedisStream.Manager.Default.cs | 31 ++--- .../IRedisStream.Manager.Extensions.cs | 12 +- .../ITransport.Redis.cs | 19 +-- .../RedisErrorExtensions.cs | 8 +- .../TransportMessage.Redis.Exceptions.cs | 16 +++ .../TransportMessage.Redis.cs | 54 ++++++-- .../Internal/IConsumerRegister.Default.cs | 4 + src/DotNetCore.CAP/Transport/MqLogType.cs | 5 +- .../CapRedisOptionsPostConfigureTests.cs | 52 ++++++++ .../CapRedisOptionsTests.cs | 103 +++++++++++++++ .../DotNetCore.CAP.RedisStreams.Test.csproj | 30 +++++ .../RedisConnectionPoolTests.cs | 118 ++++++++++++++++++ .../RedisOptionsExtensionTests.cs | 59 +++++++++ 31 files changed, 664 insertions(+), 265 deletions(-) delete mode 100644 samples/Samples.Redis.SqlServer/Startup.cs create mode 100644 src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.Exceptions.cs create mode 100644 test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsPostConfigureTests.cs create mode 100644 test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsTests.cs create mode 100644 test/DotNetCore.CAP.RedisStreams.Test/DotNetCore.CAP.RedisStreams.Test.csproj create mode 100644 test/DotNetCore.CAP.RedisStreams.Test/RedisConnectionPoolTests.cs create mode 100644 test/DotNetCore.CAP.RedisStreams.Test/RedisOptionsExtensionTests.cs diff --git a/CAP.sln b/CAP.sln index 8ed771fb2..8f78f143f 100644 --- a/CAP.sln +++ b/CAP.sln @@ -91,6 +91,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Dashboard.Jwt", "sam EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Dashboard.K8s", "src\DotNetCore.CAP.Dashboard.K8s\DotNetCore.CAP.Dashboard.K8s.csproj", "{48655118-CEC3-4BD9-B510-64C1195C2729}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.RedisStreams.Test", "test\DotNetCore.CAP.RedisStreams.Test\DotNetCore.CAP.RedisStreams.Test.csproj", "{789E851F-37B4-49D6-B405-7EC9AA1D2CF8}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -225,6 +227,9 @@ Global {48655118-CEC3-4BD9-B510-64C1195C2729}.Debug|Any CPU.Build.0 = Debug|Any CPU {48655118-CEC3-4BD9-B510-64C1195C2729}.Release|Any CPU.ActiveCfg = Release|Any CPU {48655118-CEC3-4BD9-B510-64C1195C2729}.Release|Any CPU.Build.0 = Release|Any CPU + {789E851F-37B4-49D6-B405-7EC9AA1D2CF8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {789E851F-37B4-49D6-B405-7EC9AA1D2CF8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {789E851F-37B4-49D6-B405-7EC9AA1D2CF8}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -262,6 +267,7 @@ Global {D9681967-DAC2-43EF-999F-3727F1046711} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {F739A8C9-565F-4B1D-8F91-FEE056C03FBD} = {3A6B6931-A123-477A-9469-8B468B5385AF} {48655118-CEC3-4BD9-B510-64C1195C2729} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {789E851F-37B4-49D6-B405-7EC9AA1D2CF8} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/docs/content/user-guide/en/transport/redis-streams.md b/docs/content/user-guide/en/transport/redis-streams.md index 9f586aaf3..d08fd2b46 100644 --- a/docs/content/user-guide/en/transport/redis-streams.md +++ b/docs/content/user-guide/en/transport/redis-streams.md @@ -40,7 +40,7 @@ NAME | DESCRIPTION | TYPE | DEFAULT Configuration | redis connection configuration (StackExchange.Redis) | ConfigurationOptions | ConfigurationOptions StreamEntriesCount | number of entries returned from a stream while reading | uint | 10 ConnectionPoolSize | number of connections pool | uint | 10 - +OnConsumeError | callback function that will be invoked when an error occurred during message consumption. | Func | null #### Redis Configuration Options If you need **more** native Redis related configuration options, you can set them in the `Configuration` option: diff --git a/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs b/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs index 46773ccd5..8334a36b3 100644 --- a/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs +++ b/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs @@ -1,54 +1,51 @@ using DotNetCore.CAP; using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System.Threading.Tasks; -namespace Samples.Redis.SqlServer.Controllers +namespace Samples.Redis.SqlServer.Controllers; + +[ApiController] +[Route("[controller]/[action]")] +public class HomeController : ControllerBase { - [ApiController] - [Route("[controller]/[action]")] - public class HomeController : ControllerBase + private readonly ILogger _logger; + private readonly ICapPublisher _publisher; + private readonly IOptions _options; + + public HomeController(ILogger logger, ICapPublisher publisher, IOptions options) { - private readonly ILogger _logger; - private readonly ICapPublisher _publisher; - private readonly IOptions _options; - - public HomeController(ILogger logger, ICapPublisher publisher, IOptions options) - { - _logger = logger; - _publisher = publisher; - this._options = options; - } - - [HttpGet] - public async Task Publish([FromQuery] string message = "test-message") - { - await _publisher.PublishAsync(message, new Person() { Age = 11, Name = "James" }); - } - - [CapSubscribe("test-message")] - [CapSubscribe("test-message-1")] - [CapSubscribe("test-message-2")] - [CapSubscribe("test-message-3")] - [NonAction] - public void Subscribe(Person p, [FromCap] CapHeader header) - { - _logger.LogInformation($"{header[Headers.MessageName]} subscribed with value --> " + p); - } + _logger = logger; + _publisher = publisher; + this._options = options; + } + [HttpGet] + public async Task Publish([FromQuery] string message = "test-message") + { + await _publisher.PublishAsync(message, new Person() { Age = 11, Name = "James" }); } - public class Person + [CapSubscribe("test-message")] + [CapSubscribe("test-message-1")] + [CapSubscribe("test-message-2")] + [CapSubscribe("test-message-3")] + [NonAction] + public void Subscribe(Person p, [FromCap] CapHeader header) { - public string Name { get; set; } + _logger.LogInformation($"{header[Headers.MessageName]} subscribed with value --> " + p); + } - public int Age { get; set; } +} + +public class Person +{ + public string Name { get; set; } - public override string ToString() - { - return "Name:" + Name + ", Age:" + Age; - } + public int Age { get; set; } + + public override string ToString() + { + return "Name:" + Name + ", Age:" + Age; } } diff --git a/samples/Samples.Redis.SqlServer/Dockerfile b/samples/Samples.Redis.SqlServer/Dockerfile index 539ce3db5..efeec46a5 100644 --- a/samples/Samples.Redis.SqlServer/Dockerfile +++ b/samples/Samples.Redis.SqlServer/Dockerfile @@ -1,24 +1,31 @@ -#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. +# See https://aka.ms/customizecontainer to learn how to customize your debug container and how Visual Studio uses this Dockerfile to build your images for faster debugging. -FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base +# This stage is used when running from VS in fast mode (Default for Debug configuration) +FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base +USER $APP_UID WORKDIR /app EXPOSE 80 -EXPOSE 443 -FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build +# This stage is used to build the service project +FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG BUILD_CONFIGURATION=Release WORKDIR /src +COPY ["src/Directory.Build.props", "src/"] COPY ["samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj", "samples/Samples.Redis.SqlServer/"] COPY ["src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj", "src/DotNetCore.CAP.RedisStreams/"] COPY ["src/DotNetCore.CAP/DotNetCore.CAP.csproj", "src/DotNetCore.CAP/"] COPY ["src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj", "src/DotNetCore.CAP.SqlServer/"] -RUN dotnet restore "samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj" +RUN dotnet restore "./samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj" COPY . . WORKDIR "/src/samples/Samples.Redis.SqlServer" -RUN dotnet build "Samples.Redis.SqlServer.csproj" -c Release -o /app/build +RUN dotnet build "./Samples.Redis.SqlServer.csproj" -c $BUILD_CONFIGURATION -o /app/build +# This stage is used to publish the service project to be copied to the final stage FROM build AS publish -RUN dotnet publish "Samples.Redis.SqlServer.csproj" -c Release -o /app/publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./Samples.Redis.SqlServer.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false +# This stage is used in production or when running from VS in regular mode (Default when not using the Debug configuration) FROM base AS final WORKDIR /app COPY --from=publish /app/publish . diff --git a/samples/Samples.Redis.SqlServer/Program.cs b/samples/Samples.Redis.SqlServer/Program.cs index 4cb124425..63c16ac33 100644 --- a/samples/Samples.Redis.SqlServer/Program.cs +++ b/samples/Samples.Redis.SqlServer/Program.cs @@ -1,20 +1,46 @@ -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Hosting; -namespace Samples.Redis.SqlServer -{ - public class Program +using StackExchange.Redis; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services + .AddControllers(); + +builder.Services + .AddEndpointsApiExplorer(); +builder.Services + .AddSwaggerGen(); + +builder.Services + .AddCap(options => { - public static void Main(string[] args) + options.UseRedis(redis => { - CreateHostBuilder(args).Build().Run(); - } - - public static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .ConfigureWebHostDefaults(webBuilder => - { - webBuilder.UseStartup(); - }); - } + redis.Configuration = ConfigurationOptions.Parse("redis-node-0:6379,password=cap"); + redis.OnConsumeError = context => + { + throw new InvalidOperationException(""); + }; + }); + + options.UseSqlServer("Server=db;Database=master;User=sa;Password=P@ssw0rd;Encrypt=False"); + + options.UseDashboard(); + + }); + +var app = builder.Build(); + +if (app.Environment.IsDevelopment()) +{ + app.UseSwagger(); + app.UseSwaggerUI(); } + +app.UseHttpsRedirection(); + +app.UseAuthorization(); + +app.MapControllers(); + +app.Run(); diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj index fa7203e22..c29967de4 100644 --- a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj +++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj @@ -1,16 +1,19 @@  - - - net8.0 - eb622624-fbc4-46d0-b006-dfe8e66df5bb - Linux - ..\.. - ..\..\docker-compose.dcproj - - - - - - - - + + net8.0 + eb622624-fbc4-46d0-b006-dfe8e66df5bb + Linux + enable + ..\.. + ..\..\docker-compose.dcproj + + + + + + + + + + + \ No newline at end of file diff --git a/samples/Samples.Redis.SqlServer/Startup.cs b/samples/Samples.Redis.SqlServer/Startup.cs deleted file mode 100644 index 317cc0bce..000000000 --- a/samples/Samples.Redis.SqlServer/Startup.cs +++ /dev/null @@ -1,37 +0,0 @@ -using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace Samples.Redis.SqlServer -{ - public class Startup - { - public Startup(IConfiguration configuration) - { - Configuration = configuration; - } - - public IConfiguration Configuration { get; } - - public void ConfigureServices(IServiceCollection services) - { - services.AddControllers(); - - services.AddCap(options => - { - options.UseRedis("redis-node-0:6379,password=cap"); - - options.UseSqlServer("Server=db;Database=master;User=sa;Password=P@ssw0rd;"); - }); - } - - public void Configure(IApplicationBuilder app) - { - app.UseRouting(); - app.UseEndpoints(endpoints => - { - endpoints.MapControllers(); - }); - } - } -} diff --git a/samples/Samples.Redis.SqlServer/appsettings.json b/samples/Samples.Redis.SqlServer/appsettings.json index d9d9a9bff..5c308067e 100644 --- a/samples/Samples.Redis.SqlServer/appsettings.json +++ b/samples/Samples.Redis.SqlServer/appsettings.json @@ -6,5 +6,5 @@ "Microsoft.Hosting.Lifetime": "Information" } }, - "AllowedHosts": "*" + "AllowedHosts": "*" } diff --git a/samples/Samples.Redis.SqlServer/docker-compose.yml b/samples/Samples.Redis.SqlServer/docker-compose.yml index a74cd8b67..062949161 100644 --- a/samples/Samples.Redis.SqlServer/docker-compose.yml +++ b/samples/Samples.Redis.SqlServer/docker-compose.yml @@ -1,7 +1,6 @@ -version: '2' services: redis-node-0: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 volumes: - redis-cluster_data-0:/bitnami/redis/data environment: @@ -9,7 +8,7 @@ services: - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' redis-node-1: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 volumes: - redis-cluster_data-1:/bitnami/redis/data environment: @@ -17,7 +16,7 @@ services: - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' redis-node-2: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 volumes: - redis-cluster_data-2:/bitnami/redis/data environment: @@ -25,7 +24,7 @@ services: - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' redis-node-3: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 volumes: - redis-cluster_data-3:/bitnami/redis/data environment: @@ -33,7 +32,7 @@ services: - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' redis-node-4: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 volumes: - redis-cluster_data-4:/bitnami/redis/data environment: @@ -41,7 +40,9 @@ services: - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' redis-node-5: - image: docker.io/bitnami/redis-cluster:6.2 + image: docker.io/bitnami/redis-cluster:7.0 + ports: + - 6379:6379 volumes: - redis-cluster_data-5:/bitnami/redis/data depends_on: @@ -63,14 +64,17 @@ services: - 1433:1433 environment: SA_PASSWORD: "P@ssw0rd" - ACCEPT_EULA: "Y" + ACCEPT_EULA: "Y" redis-sample: build: context: ../.. dockerfile: samples/Samples.Redis.SqlServer/Dockerfile + environment: + - ASPNETCORE_URLS:http://*:8080 + - ASPNETCORE_ENVIRONMENT=Development ports: - - 5000:80 + - 8080:8080 depends_on: - db - redis-node-5 @@ -87,4 +91,4 @@ volumes: redis-cluster_data-4: driver: local redis-cluster_data-5: - driver: local + driver: local \ No newline at end of file diff --git a/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs index d9929a2dd..6246acb66 100644 --- a/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs +++ b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs @@ -36,7 +36,7 @@ public static CapOptions UseRedis(this CapOptions options, string connection) /// is null. public static CapOptions UseRedis(this CapOptions options, Action configure) { - if (configure is null) throw new ArgumentNullException(nameof(configure)); + ArgumentNullException.ThrowIfNull(configure); options.RegisterExtension(new RedisOptionsExtension(configure)); diff --git a/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs index fcced9e51..cf8663647 100644 --- a/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using StackExchange.Redis; +using System; +using System.Threading.Tasks; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP; @@ -24,4 +26,10 @@ public class CapRedisOptions /// Gets or sets the number of connections that can be used with redis server /// public uint ConnectionPoolSize { get; set; } + + /// + /// Callback function that will be invoked when an error occurred during message consumption. + /// + public Func? OnConsumeError { get; set; } + public record ConsumeErrorContext(Exception Exception, StreamEntry? Entry); } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj b/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj index 0e8a3eec2..20c7fab52 100644 --- a/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj +++ b/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj @@ -1,9 +1,10 @@  - net6.0 + net6.0;net8.0 enable $(PackageTags);RedisStreams + preview @@ -17,6 +18,7 @@ + diff --git a/src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs index aba2f0da2..156e4f584 100644 --- a/src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs @@ -14,14 +14,9 @@ // ReSharper disable once CheckNamespace namespace DotNetCore.CAP; -internal class RedisOptionsExtension : ICapOptionsExtension +internal class RedisOptionsExtension(Action configure) : ICapOptionsExtension { - private readonly Action _configure; - - public RedisOptionsExtension(Action configure) - { - _configure = configure ?? throw new ArgumentNullException(nameof(configure)); - } + private readonly Action _configure = configure ?? throw new ArgumentNullException(nameof(configure)); public void AddServices(IServiceCollection services) { diff --git a/src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs index e8534600b..401c86910 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs @@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RedisStreams; internal class RedisConnectionPool : IRedisConnectionPool, IDisposable { - private readonly ConcurrentBag _connections = new(); + private readonly ConcurrentBag _connections = []; private readonly ILoggerFactory _loggerFactory; private readonly SemaphoreSlim _poolLock = new(1); @@ -33,10 +33,8 @@ private AsyncLazyRedisConnection? QuietConnection { get { - return _poolAlreadyConfigured - ? _connections.OrderBy(static c => c.CreatedConnection?.ConnectionCapacity - ?? int.MaxValue).First() - : null; + return _poolAlreadyConfigured ? _connections.OrderBy(static c => c.CreatedConnection?.ConnectionCapacity ?? int.MaxValue) + .First() : null; } } @@ -71,7 +69,7 @@ private async Task Init() { await _poolLock.WaitAsync(); - if (_connections.Any()) return; + if (!_connections.IsEmpty) return; for (var i = 0; i < _redisOptions.ConnectionPoolSize; i++) { diff --git a/src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs index a7e37dd25..6f6e77de0 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs @@ -9,13 +9,11 @@ namespace DotNetCore.CAP.RedisStreams; -public class AsyncLazyRedisConnection : Lazy> +public class AsyncLazyRedisConnection( + CapRedisOptions redisOptions, + ILogger logger) + : Lazy>(() => ConnectAsync(redisOptions, logger)) { - public AsyncLazyRedisConnection(CapRedisOptions redisOptions, - ILogger logger) : base(() => ConnectAsync(redisOptions, logger)) - { - } - public RedisConnection? CreatedConnection => IsValueCreated ? Value.GetAwaiter().GetResult() : null; @@ -27,13 +25,13 @@ public TaskAwaiter GetAwaiter() private static async Task ConnectAsync(CapRedisOptions redisOptions, ILogger logger) { - var attemp = 1; + var attempt = 1; var redisLogger = new RedisLogger(logger); ConnectionMultiplexer? connection = null; - while (attemp <= 5) + while (attempt <= 5) { connection = await ConnectionMultiplexer.ConnectAsync(redisOptions.Configuration!, redisLogger) .ConfigureAwait(false); @@ -42,36 +40,29 @@ private static async Task ConnectAsync(CapRedisOptions redisOpt if (!connection.IsConnected) { - logger.LogWarning( - $"Can't establish redis connection,trying to establish connection [attemp {attemp}]."); + logger.LogWarning("Can't establish redis connection,trying to establish connection [attempt {attempt}].", attempt); await Task.Delay(TimeSpan.FromSeconds(2)) .ConfigureAwait(false); - ++attemp; + ++attempt; } else { - attemp = 6; + attempt = 6; } } - if (connection == null) throw new Exception($"Can't establish redis connection,after [{attemp}] attemps."); + if (connection == null) throw new Exception($"Can't establish redis connection,after [{attempt}] attempts."); return new RedisConnection(connection); } } -public class RedisConnection : IDisposable +public class RedisConnection(IConnectionMultiplexer connection) : IDisposable { private bool _isDisposed; - - public RedisConnection(IConnectionMultiplexer connection) - { - Connection = connection ?? throw new ArgumentNullException(nameof(connection)); - } - - public IConnectionMultiplexer Connection { get; } + public IConnectionMultiplexer Connection { get; } = connection ?? throw new ArgumentNullException(nameof(connection)); public long ConnectionCapacity => Connection.GetCounters().TotalOutstanding; public void Dispose() diff --git a/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs index 1b5b55d14..308534aa6 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs @@ -14,31 +14,17 @@ namespace DotNetCore.CAP.RedisStreams; -internal class RedisConsumerClient : IConsumerClient +internal class RedisConsumerClient( + string _groupId, + byte _groupConcurrent, + IRedisStreamManager _redis, + IOptions _options, + ILogger _logger + ) : IConsumerClient { - private readonly string _groupId; - private readonly byte _groupConcurrent; - private readonly SemaphoreSlim _semaphore; - private readonly ILogger _logger; - private readonly IOptions _options; - private readonly IRedisStreamManager _redis; + private readonly SemaphoreSlim _semaphore = new(_groupConcurrent); private string[] _topics = default!; - public RedisConsumerClient(string groupId, - byte groupConcurrent, - IRedisStreamManager redis, - IOptions options, - ILogger logger - ) - { - _groupId = groupId; - _groupConcurrent = groupConcurrent; - _semaphore = new SemaphoreSlim(groupConcurrent); - _redis = redis; - _options = options; - _logger = logger; - } - public Func? OnMessageCallback { get; set; } public Action? OnLogCallback { get; set; } @@ -47,7 +33,7 @@ ILogger logger public void Subscribe(IEnumerable topics) { - if (topics == null) throw new ArgumentNullException(nameof(topics)); + ArgumentNullException.ThrowIfNull(topics); foreach (var topic in topics) { @@ -134,20 +120,35 @@ async Task Consume(RedisValue position, RedisStream stream, StreamEntry entry) } catch (Exception ex) { - _logger.LogError(ex.Message, ex); + _logger.LogError(ex, message: "Redis entry {entryId} on stream {streamKey} at position {position} of group {groupId} is not valid for Cap, see inner exception for more details.", entry.Id, stream.Key, position, _groupId); + var logArgs = new LogMessageEventArgs { - LogType = MqLogType.ConsumeError, + LogType = MqLogType.RedisConsumeError, Reason = ex.ToString() }; - OnLogCallback!(logArgs); + + try + { + var onError = _options.Value.OnConsumeError?.Invoke(new CapRedisOptions.ConsumeErrorContext(ex, entry)); + + await (onError ?? Task.CompletedTask).ConfigureAwait(false); + } + catch (Exception onError) + { + _logger.LogError(onError, "Unhandled exception occurred in {action} action, Exception has been caught.", nameof(CapRedisOptions.OnConsumeError)); + } + finally + { + OnLogCallback!(logArgs); + } } finally { var positionName = position == StreamPosition.Beginning ? nameof(StreamPosition.Beginning) : nameof(StreamPosition.NewMessages); - _logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); + _logger.LogDebug("Redis stream entry [{entryId}] [position : {positionName}] was delivered.", entry.Id, positionName); } } } diff --git a/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs index 941240967..0383b4fa2 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs @@ -7,20 +7,11 @@ namespace DotNetCore.CAP.RedisStreams; -internal class RedisConsumerClientFactory : IConsumerClientFactory +internal class RedisConsumerClientFactory( + IOptions _redisOptions, + IRedisStreamManager _redis, + ILogger _logger) : IConsumerClientFactory { - private readonly ILogger _logger; - private readonly IRedisStreamManager _redis; - private readonly IOptions _redisOptions; - - public RedisConsumerClientFactory(IOptions redisOptions, IRedisStreamManager redis, - ILogger logger) - { - _redisOptions = redisOptions; - _redis = redis; - _logger = logger; - } - public IConsumerClient Create(string groupName, byte groupConcurrent) { return new RedisConsumerClient(groupName, groupConcurrent, _redis, _redisOptions, _logger); diff --git a/src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs b/src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs index 57519dff9..b18939eba 100644 --- a/src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs +++ b/src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs @@ -9,32 +9,34 @@ namespace DotNetCore.CAP.RedisStreams; internal class RedisEvents { + private readonly IConnectionMultiplexer _connection; private readonly ILogger _logger; public RedisEvents(IConnectionMultiplexer connection, ILogger logger) { _logger = logger; - connection.ErrorMessage += Connection_ErrorMessage; - connection.ConnectionRestored += Connection_ConnectionRestored; - connection.ConnectionFailed += Connection_ConnectionFailed; + _connection = connection; + _connection.ErrorMessage += Connection_ErrorMessage; + _connection.ConnectionRestored += Connection_ConnectionRestored; + _connection.ConnectionFailed += Connection_ConnectionFailed; } private void Connection_ConnectionFailed(object? sender, ConnectionFailedEventArgs e) { - _logger.LogError(e.Exception, - $"Connection failed!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + _logger.LogError(e.Exception, "Connection failed!, {message}, for endpoint:{endPoint}, failure type:{failureType}, connection type:{connectionType}", e.Exception?.Message, e.EndPoint, e.FailureType, e.ConnectionType); } private void Connection_ConnectionRestored(object? sender, ConnectionFailedEventArgs e) { - _logger.LogWarning( - $"Connection restored back!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + _logger.LogWarning("Connection restored back!, {message}, for endpoint:{endPoint}, failure type:{failureType}, connection type:{connectionType}", e.Exception?.Message, e.EndPoint, e.FailureType, e.ConnectionType); } private void Connection_ErrorMessage(object? sender, RedisErrorEventArgs e) { if (e.Message.GetRedisErrorType() == RedisErrorTypes.Unknown) - _logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}"); + { + _logger.LogError("Server replied with error, {message}, for endpoint:{endPoint}", e.Message, e.EndPoint); + } } } @@ -42,9 +44,9 @@ internal static class RedisConnectionExtensions { public static void LogEvents(this IConnectionMultiplexer connection, ILogger logger) { - if (connection is null) throw new ArgumentNullException(nameof(connection)); + ArgumentNullException.ThrowIfNull(connection); - if (logger is null) throw new ArgumentNullException(nameof(logger)); + ArgumentNullException.ThrowIfNull(logger); _ = new RedisEvents(connection, logger); } diff --git a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs index bcd0114f1..08efa8400 100644 --- a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs +++ b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs @@ -13,21 +13,14 @@ namespace DotNetCore.CAP.RedisStreams; -internal class RedisStreamManager : IRedisStreamManager +internal class RedisStreamManager( + IRedisConnectionPool _connectionsPool, + IOptions options, + ILogger _logger) : IRedisStreamManager { - private readonly IRedisConnectionPool _connectionsPool; - private readonly ILogger _logger; - private readonly CapRedisOptions _options; + private readonly CapRedisOptions _options = options.Value; private IConnectionMultiplexer? _redis; - public RedisStreamManager(IRedisConnectionPool connectionsPool, IOptions options, - ILogger logger) - { - _options = options.Value; - _connectionsPool = connectionsPool; - _logger = logger; - } - public async Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup) { await ConnectAsync() @@ -81,8 +74,7 @@ public async IAsyncEnumerable> PollStreamsPendingMessag yield return result; //Once we consumed our history of pending messages, we can break the loop. - if (result.All(s => s.Entries.Length < _options.StreamEntriesCount)) - break; + if (result.All(s => s.Entries.Length < _options.StreamEntriesCount)) break; token.WaitHandle.WaitOne(pollDelay); } @@ -104,7 +96,7 @@ private async Task> TryReadConsumerGroupAsync(string co { token.ThrowIfCancellationRequested(); - var createdPositions = new List(); + List createdPositions = []; await ConnectAsync() .ConfigureAwait(false); @@ -118,12 +110,11 @@ await ConnectAsync() createdPositions.Add(position); } - if (!createdPositions.Any()) return Array.Empty(); + if (createdPositions.Count == 0) return []; //calculate keys HashSlots to start reading per HashSlot var groupedPositions = createdPositions.GroupBy(s => _redis.GetHashSlot(s.Key)) - .Select(group => database.StreamReadGroupAsync(group.ToArray(), consumerGroup, consumerGroup, - (int)_options.StreamEntriesCount)); + .Select(group => database.StreamReadGroupAsync([.. group], consumerGroup, consumerGroup, (int)_options.StreamEntriesCount)); var readSet = await Task.WhenAll(groupedPositions) .ConfigureAwait(false); @@ -136,10 +127,10 @@ await ConnectAsync() } catch (Exception ex) { - _logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}"); + _logger.LogError(ex, "Redis error when trying read consumer group {consumerGroup}", consumerGroup); } - return Array.Empty(); + return []; } private async Task ConnectAsync() diff --git a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs index 3de0fb241..285991959 100644 --- a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs +++ b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs @@ -28,12 +28,12 @@ await database.TryGetOrCreateStreamConsumerGroupAsync(position.Key, consumerGrou catch (Exception ex) { if (ex.GetRedisErrorType() == RedisErrorTypes.Unknown) - logger?.LogError(ex, - $"Redis error while creating consumer group [{consumerGroup}] of stream [{position.Key}]"); + { + logger?.LogError(ex, "Redis error while creating consumer group [{consumerGroup}] of stream [{position}]", consumerGroup, position.Key); + } } - if (created) - yield return position; + if (created) yield return position; } } @@ -66,8 +66,8 @@ private static async Task TryGetOrCreateStreamGroupAsync(this IDatabase database try { var groupInfo = await database.StreamGroupInfoAsync(stream); - if (groupInfo.Any(g => g.Name == consumerGroup)) - return; + + if (groupInfo.Any(g => g.Name == consumerGroup)) return; await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages) .ConfigureAwait(false); diff --git a/src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs b/src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs index 8bf9dfd6d..7d677a689 100644 --- a/src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs @@ -11,19 +11,12 @@ namespace DotNetCore.CAP.RedisStreams; -internal class RedisTransport : ITransport +internal class RedisTransport( + IRedisStreamManager _redis, + IOptions options, + ILogger _logger) : ITransport { - private readonly ILogger _logger; - private readonly CapRedisOptions _options; - private readonly IRedisStreamManager _redis; - - public RedisTransport(IRedisStreamManager redis, IOptions options, - ILogger logger) - { - _redis = redis; - _options = options.Value; - _logger = logger; - } + private readonly CapRedisOptions _options = options.Value; public BrokerAddress BrokerAddress => new("redis", _options.Endpoint); @@ -34,7 +27,7 @@ public async Task SendAsync(TransportMessage message) await _redis.PublishAsync(message.GetName(), message.AsStreamEntries()) .ConfigureAwait(false); - _logger.LogDebug($"Redis message [{message.GetName()}] has been published."); + _logger.LogDebug("Redis message [{message}] has been published.",message.GetName()); return OperateResult.Success; } diff --git a/src/DotNetCore.CAP.RedisStreams/RedisErrorExtensions.cs b/src/DotNetCore.CAP.RedisStreams/RedisErrorExtensions.cs index bb1e64536..0379ce6d3 100644 --- a/src/DotNetCore.CAP.RedisStreams/RedisErrorExtensions.cs +++ b/src/DotNetCore.CAP.RedisStreams/RedisErrorExtensions.cs @@ -9,11 +9,15 @@ internal static class RedisErrorExtensions { public static RedisErrorTypes GetRedisErrorType(this string redisError) { - if (string.Equals("BUSYGROUP Consumer Group name already exists", redisError, - StringComparison.InvariantCultureIgnoreCase)) return RedisErrorTypes.GroupAlreadyExists; + if (string.Equals("BUSYGROUP Consumer Group name already exists", redisError, StringComparison.InvariantCultureIgnoreCase)) + { + return RedisErrorTypes.GroupAlreadyExists; + } if (string.Equals("ERR no such key", redisError, StringComparison.InvariantCultureIgnoreCase)) + { return RedisErrorTypes.NoGroupInfoExists; + } return RedisErrorTypes.Unknown; } diff --git a/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.Exceptions.cs b/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.Exceptions.cs new file mode 100644 index 000000000..6c6167496 --- /dev/null +++ b/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.Exceptions.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using StackExchange.Redis; + +namespace DotNetCore.CAP.RedisStreams; + +internal class RedisConsumeMissingHeadersException(StreamEntry entry) : Exception(message: $"Redis entry [{entry.Id}] is missing Cap headers."); + +internal class RedisConsumeMissingBodyException(StreamEntry entry) : Exception(message: $"Redis entry [{entry.Id}] is missing Cap body."); + +internal class RedisConsumeInvalidHeadersException(StreamEntry entry, Exception ex) + : Exception(message: $"Redis entry [{entry.Id}] has not headers that are formatted properly as Cap headers.", ex); + +internal class RedisConsumeInvalidBodyException(StreamEntry entry, Exception ex) : Exception(message: $"Redis entry [{entry.Id}] has not body that is formatted properly as Cap body.", ex); \ No newline at end of file diff --git a/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs b/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs index c01659887..4eef6806c 100644 --- a/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text.Json; using DotNetCore.CAP.Messages; using StackExchange.Redis; @@ -13,29 +14,60 @@ internal static class RedisMessage { private const string Headers = "headers"; private const string Body = "body"; + private readonly static JsonSerializerOptions JSON_OPTIONS = new(JsonSerializerDefaults.Web); public static NameValueEntry[] AsStreamEntries(this TransportMessage message) { - return new[] - { + return + [ new NameValueEntry(Headers, ToJson(message.Headers)), new NameValueEntry(Body, ToJson(message.Body.ToArray())) - }; + ]; } public static TransportMessage Create(StreamEntry streamEntry, string? groupId = null) { - var headersRaw = streamEntry[Headers]; - if (headersRaw.IsNullOrEmpty) - throw new ArgumentException($"Redis stream entry with id {streamEntry.Id} missing cap headers"); + IDictionary headers; + byte[]? body; - var headers = JsonSerializer.Deserialize>(headersRaw!)!; + var streamDict = streamEntry.Values.ToDictionary(c => c.Name, c => c.Value); - var bodyRaw = streamEntry[Body]; + if (!streamDict.TryGetValue(Headers, out var headersRaw) || headersRaw.IsNullOrEmpty) + { + throw new RedisConsumeMissingHeadersException(streamEntry); + } - var body = !bodyRaw.IsNullOrEmpty ? JsonSerializer.Deserialize(bodyRaw!) : null; + if (!streamDict.TryGetValue(Body, out var bodyRaw)) + { + throw new RedisConsumeMissingBodyException(streamEntry); + } - headers.TryAdd(Messages.Headers.Group, groupId); + try + { + headers = JsonSerializer.Deserialize>(json: headersRaw!, JSON_OPTIONS)!; + } + catch (Exception ex) + { + throw new RedisConsumeInvalidHeadersException(streamEntry, ex); + } + + if (!bodyRaw.IsNullOrEmpty) + { + try + { + body = JsonSerializer.Deserialize(json: bodyRaw!, JSON_OPTIONS); + } + catch (Exception ex) + { + throw new RedisConsumeInvalidBodyException(streamEntry, ex); + } + } + else body = null; + + if (!string.IsNullOrEmpty(groupId)) + { + headers[Messages.Headers.Group] = groupId; + } return new TransportMessage(headers, body); } @@ -43,6 +75,6 @@ public static TransportMessage Create(StreamEntry streamEntry, string? groupId = private static RedisValue ToJson(object? obj) { if (obj == null) return RedisValue.Null; - return JsonSerializer.Serialize(obj, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + return JsonSerializer.Serialize(obj, JSON_OPTIONS); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 872fc16ac..385838836 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -328,6 +328,10 @@ private void WriteLog(LogMessageEventArgs logmsg) "AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason); break; + case MqLogType.RedisConsumeError: + _isHealthy = true; + _logger.LogError("Redis client consume error. --> {reason}", logmsg.Reason); + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index ae6f39b4d..bcc27872d 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -27,7 +27,10 @@ public enum MqLogType //Amazon SQS InvalidIdFormat, - MessageNotInflight + MessageNotInflight, + + //RedisStreams + RedisConsumeError } public class LogMessageEventArgs : EventArgs diff --git a/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsPostConfigureTests.cs b/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsPostConfigureTests.cs new file mode 100644 index 000000000..c78bd36f9 --- /dev/null +++ b/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsPostConfigureTests.cs @@ -0,0 +1,52 @@ +using StackExchange.Redis; +using System.Net; + +namespace DotNetCore.CAP.RedisStreams.Test; + +public class CapRedisOptionsPostConfigureTests +{ + [Fact] + public void PostConfigure_Should_Set_Default_Values() + { + // Arrange + var options = new CapRedisOptions(); + var postConfigure = new CapRedisOptionsPostConfigure(); + + // Act + postConfigure.PostConfigure(null, options); + + // Assert + Assert.NotNull(options.Configuration); + Assert.Equal((uint)10, options.StreamEntriesCount); + Assert.Equal((uint)10, options.ConnectionPoolSize); + Assert.Single(options.Configuration.EndPoints); + Assert.Equal($"{IPAddress.Loopback}:6379", options.Configuration.EndPoints.First().ToString()); + } + + [Fact] + public void PostConfigure_Should_Not_Override_Existing_Values() + { + // Arrange + var options = new CapRedisOptions + { + StreamEntriesCount = 50, + ConnectionPoolSize = 100, + Configuration = new ConfigurationOptions + { + EndPoints = { "localhost:6379" } + } + }; + + var postConfigure = new CapRedisOptionsPostConfigure(); + + // Act + postConfigure.PostConfigure(null, options); + + // Assert + Assert.NotNull(options.Configuration); + Assert.Equal((uint)50, options.StreamEntriesCount); + Assert.Equal((uint)100, options.ConnectionPoolSize); + Assert.Single(options.Configuration.EndPoints); + Assert.Contains("localhost:6379", options.Configuration.ToString()); + } +} diff --git a/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsTests.cs b/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsTests.cs new file mode 100644 index 000000000..8bc144074 --- /dev/null +++ b/test/DotNetCore.CAP.RedisStreams.Test/CapRedisOptionsTests.cs @@ -0,0 +1,103 @@ +using StackExchange.Redis; + +namespace DotNetCore.CAP.RedisStreams.Test; +public class CapRedisOptionsTests +{ + [Fact] + public void Default_Endpoint_Should_Be_Empty() + { + // Arrange + var options = new CapRedisOptions(); + + // Act + var endpoint = options.Endpoint; + + // Assert + Assert.Equal(string.Empty, endpoint); + } + + [Fact] + public void Endpoint_Should_Return_Correct_Value_When_Configuration_Is_Set() + { + // Arrange + var options = new CapRedisOptions + { + Configuration = new ConfigurationOptions + { + EndPoints = { "localhost:6379" } + } + }; + + // Act + var endpoint = options.Endpoint; + + // Assert + Assert.Contains("localhost:6379", endpoint); + } + + [Fact] + public void StreamEntriesCount_Should_Be_Set_Correctly() + { + // Arrange + CapRedisOptions options = new () + { + // Act + StreamEntriesCount = 10 + }; + + // Assert + Assert.Equal((uint)10, options.StreamEntriesCount); + } + + [Fact] + public void ConnectionPoolSize_Should_Be_Set_Correctly() + { + // Arrange + var options = new CapRedisOptions + { + // Act + ConnectionPoolSize = 5 + }; + + // Assert + Assert.Equal((uint)5, options.ConnectionPoolSize); + } + + [Fact] + public void OnConsumeError_Should_Invoke_Correctly() + { + // Arrange + var options = new CapRedisOptions(); + var invoked = false; + + options.OnConsumeError = context => + { + invoked = true; + Assert.NotNull(context.Exception); + Assert.Equal("Test exception", context.Exception.Message); + Assert.Null(context.Entry); + return Task.CompletedTask; + }; + + var errorContext = new CapRedisOptions.ConsumeErrorContext(new Exception("Test exception"), null); + + // Act + options.OnConsumeError?.Invoke(errorContext); + + // Assert + Assert.True(invoked); + } + + [Fact] + public void ConsumeErrorContext_Should_Hold_Correct_Values() + { + // Arrange + var exception = new Exception("Error message"); + var streamEntry = new StreamEntry("entryId", null); + var context = new CapRedisOptions.ConsumeErrorContext(exception, streamEntry); + + // Assert + Assert.Equal(exception, context.Exception); + Assert.Equal(streamEntry, context.Entry); + } +} diff --git a/test/DotNetCore.CAP.RedisStreams.Test/DotNetCore.CAP.RedisStreams.Test.csproj b/test/DotNetCore.CAP.RedisStreams.Test/DotNetCore.CAP.RedisStreams.Test.csproj new file mode 100644 index 000000000..0b6c8c248 --- /dev/null +++ b/test/DotNetCore.CAP.RedisStreams.Test/DotNetCore.CAP.RedisStreams.Test.csproj @@ -0,0 +1,30 @@ + + + + net8.0 + enable + enable + false + true + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/DotNetCore.CAP.RedisStreams.Test/RedisConnectionPoolTests.cs b/test/DotNetCore.CAP.RedisStreams.Test/RedisConnectionPoolTests.cs new file mode 100644 index 000000000..d40b848e9 --- /dev/null +++ b/test/DotNetCore.CAP.RedisStreams.Test/RedisConnectionPoolTests.cs @@ -0,0 +1,118 @@ +using System.Collections.Concurrent; +using System.Reflection; +using System.Runtime.CompilerServices; +using DotNet.Testcontainers.Builders; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using StackExchange.Redis; +using Testcontainers.Redis; + +namespace DotNetCore.CAP.RedisStreams.Test; + +public class RedisConnectionPoolTests : IAsyncLifetime +{ + private readonly Mock> _optionsMock; + private readonly Mock _loggerFactoryMock; + private readonly Mock> _loggerMock; + private readonly CapRedisOptions _redisOptions; + private RedisContainer? _redisContainer; + + public RedisConnectionPoolTests() + { + _redisOptions = new CapRedisOptions + { + ConnectionPoolSize = 5, + Configuration = ConfigurationOptions.Parse("localhost:6379") + }; + + _optionsMock = new Mock>(); + _optionsMock.Setup(o => o.Value).Returns(_redisOptions); + + // Mock ILoggerFactory and ILogger for AsyncLazyRedisConnection + _loggerMock = new Mock>(); + _loggerFactoryMock = new Mock(); + _loggerFactoryMock + .Setup(factory => factory.CreateLogger(It.IsAny())) + .Returns(_loggerMock.Object); + } + + [Fact] + public void Init_Should_Create_Correct_Number_Of_Connections() + { + // Arrange + var pool = new RedisConnectionPool(_optionsMock.Object, _loggerFactoryMock.Object); + + // Act + var connectionsField = typeof(RedisConnectionPool) + .GetField("_connections", BindingFlags.NonPublic | BindingFlags.Instance); + + var connections = (ConcurrentBag)connectionsField!.GetValue(pool)!; + + // Assert + Assert.NotNull(connections); + Assert.Equal(_redisOptions.ConnectionPoolSize, (uint)connections.Count); + } + + [Fact] + public async Task ConnectAsync_Should_Return_Available_Connection() + { + // Arrange + var pool = new RedisConnectionPool(_optionsMock.Object, _loggerFactoryMock.Object); + + // Act + var connection = await pool.ConnectAsync(); + + // Assert + Assert.NotNull(connection); + Assert.IsAssignableFrom(connection); + } + + [Fact] + public void Dispose_Should_Cleanup_Connections() + { + // Arrange + var pool = new RedisConnectionPool(_optionsMock.Object, _loggerFactoryMock.Object); + + // Act + pool.Dispose(); + + var isDisposedField = typeof(RedisConnectionPool) + .GetField("_isDisposed", BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + + var isDisposed = (bool)isDisposedField!.GetValue(pool)!; + + // Assert + Assert.True(isDisposed); + } + + [Fact] + public void Dispose_Should_Not_Throw_If_Already_Disposed() + { + // Arrange + var pool = new RedisConnectionPool(_optionsMock.Object, _loggerFactoryMock.Object); + + // Act & Assert + pool.Dispose(); + var exception = Record.Exception(() => pool.Dispose()); + Assert.Null(exception); + } + + public Task InitializeAsync() + { + // Create a Redis container using TestContainers + _redisContainer = new RedisBuilder() + .WithImage("redis:7.0") + .WithPortBinding(6379, 6379) + .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(6379)) + .Build(); + + return _redisContainer.StartAsync(); + } + + public Task DisposeAsync() + { + var task = _redisContainer?.StopAsync(); + return task ?? Task.CompletedTask; + } +} diff --git a/test/DotNetCore.CAP.RedisStreams.Test/RedisOptionsExtensionTests.cs b/test/DotNetCore.CAP.RedisStreams.Test/RedisOptionsExtensionTests.cs new file mode 100644 index 000000000..f0e3140bd --- /dev/null +++ b/test/DotNetCore.CAP.RedisStreams.Test/RedisOptionsExtensionTests.cs @@ -0,0 +1,59 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.RedisStreams.Test; + +public class RedisOptionsExtensionTests +{ + [Fact] + public void AddServices_Should_Register_All_Services() + { + // Arrange + var services = new ServiceCollection(); + var configureOptions = new Action(options => { }); + + var extension = new RedisOptionsExtension(configureOptions); + + // Act + extension.AddServices(services); + services.AddLogging(); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + + Assert.NotNull(serviceProvider.GetService()); + Assert.NotNull(serviceProvider.GetService()); + Assert.NotNull(serviceProvider.GetService()); + Assert.NotNull(serviceProvider.GetService()); + Assert.NotNull(serviceProvider.GetService()); + + // Verify the post-configuration for CapRedisOptions + var postConfig = serviceProvider.GetServices>(); + Assert.Single(postConfig); + } + + [Fact] + public void AddServices_Should_Configure_CapRedisOptions() + { + // Arrange + var services = new ServiceCollection(); + var configureOptions = new Action(options => + { + options.StreamEntriesCount = 20; + options.ConnectionPoolSize = 30; + }); + + var extension = new RedisOptionsExtension(configureOptions); + + // Act + extension.AddServices(services); + var serviceProvider = services.BuildServiceProvider(); + var options = serviceProvider.GetService>()?.Value; + + // Assert + Assert.NotNull(options); + Assert.Equal((uint)20, options!.StreamEntriesCount); + Assert.Equal((uint)30, options.ConnectionPoolSize); + } +}