Skip to content

Commit

Permalink
Renamed consumers and added sleep time to pollingworker
Browse files Browse the repository at this point in the history
  • Loading branch information
tnickelsen committed Jun 14, 2024
1 parent e8cc88d commit 80796af
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
unitOfWork.Rollback();
}
}
else
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace ProjectOrigin.Stamp.Server.EventHandlers;

public class CertificateCreatedEvent
public class CertificateStoredEvent
{
public required Guid CertificateId { get; init; }
public required string RegistryName { get; init; }
Expand All @@ -31,19 +31,19 @@ public class CertificateCreatedEvent
public required IEnumerable<CertificateHashedAttribute> HashedAttributes { get; init; }
}

public class CertificateCreatedEventHandler : IConsumer<CertificateCreatedEvent>
public class IssueInRegistryConsumer : IConsumer<CertificateStoredEvent>
{
private readonly IKeyGenerator _keyGenerator;
private readonly RegistryOptions _registryOptions;

public CertificateCreatedEventHandler(IKeyGenerator keyGenerator,
public IssueInRegistryConsumer(IKeyGenerator keyGenerator,
IOptions<RegistryOptions> registryOptions)
{
_keyGenerator = keyGenerator;
_registryOptions = registryOptions.Value;
}

public async Task Consume(ConsumeContext<CertificateCreatedEvent> context)
public async Task Consume(ConsumeContext<CertificateStoredEvent> context)
{
var message = context.Message;
var endpointPosition = WalletEndpointPositionCalculator.CalculateWalletEndpointPosition(message.Start);
Expand Down Expand Up @@ -80,17 +80,17 @@ await context.Publish<CertificateSentToRegistryEvent>(new CertificateSentToRegis
}
}

public class CertificateCreatedEventHandlerDefinition : ConsumerDefinition<CertificateCreatedEventHandler>
public class IssueInRegistryConsumerDefinition : ConsumerDefinition<IssueInRegistryConsumer>
{
private readonly RetryOptions _retryOptions;

public CertificateCreatedEventHandlerDefinition(IOptions<RetryOptions> options)
public IssueInRegistryConsumerDefinition(IOptions<RetryOptions> options)
{
_retryOptions = options.Value;
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CertificateCreatedEventHandler> consumerConfigurator,
IConsumerConfigurator<IssueInRegistryConsumer> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public record CertificateIssuedInRegistryEvent
public required byte[] RandomR { get; init; }
}

public class CertificateIssuedInRegistryEventHandler : IConsumer<CertificateIssuedInRegistryEvent>
public class MarkCertificateAsIssuedConsumer : IConsumer<CertificateIssuedInRegistryEvent>
{
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<CertificateIssuedInRegistryEventHandler> _logger;
private readonly ILogger<MarkCertificateAsIssuedConsumer> _logger;

public CertificateIssuedInRegistryEventHandler(IUnitOfWork unitOfWork,
ILogger<CertificateIssuedInRegistryEventHandler> logger)
public MarkCertificateAsIssuedConsumer(IUnitOfWork unitOfWork,
ILogger<MarkCertificateAsIssuedConsumer> logger)
{
_unitOfWork = unitOfWork;
_logger = logger;
Expand Down Expand Up @@ -72,17 +72,17 @@ await _unitOfWork.OutboxMessageRepository.Create(new OutboxMessage
}
}

public class CertificateIssuedInRegistryEventHandlerDefinition : ConsumerDefinition<CertificateIssuedInRegistryEventHandler>
public class MarkCertificateAsIssuedConsumerDefinition : ConsumerDefinition<MarkCertificateAsIssuedConsumer>
{
private readonly RetryOptions _retryOptions;

public CertificateIssuedInRegistryEventHandlerDefinition(IOptions<RetryOptions> options)
public MarkCertificateAsIssuedConsumerDefinition(IOptions<RetryOptions> options)
{
_retryOptions = options.Value;
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CertificateIssuedInRegistryEventHandler> consumerConfigurator,
IConsumerConfigurator<MarkCertificateAsIssuedConsumer> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ public record CertificateFailedInRegistryEvent
public required string RegistryName { get; init; }
}

public class CertificateFailedInRegistryEventHandler : IConsumer<CertificateFailedInRegistryEvent>
public class RejectCertificateConsumer : IConsumer<CertificateFailedInRegistryEvent>
{
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<CertificateFailedInRegistryEventHandler> _logger;
private readonly ILogger<RejectCertificateConsumer> _logger;

public CertificateFailedInRegistryEventHandler(IUnitOfWork unitOfWork, ILogger<CertificateFailedInRegistryEventHandler> logger)
public RejectCertificateConsumer(IUnitOfWork unitOfWork, ILogger<RejectCertificateConsumer> logger)
{
_unitOfWork = unitOfWork;
_logger = logger;
Expand Down Expand Up @@ -52,17 +52,17 @@ public async Task Consume(ConsumeContext<CertificateFailedInRegistryEvent> conte
}
}

public class CertificateFailedInRegistryEventHandlerDefinition : ConsumerDefinition<CertificateFailedInRegistryEventHandler>
public class RejectCertificateConsumerDefinition : ConsumerDefinition<RejectCertificateConsumer>
{
private readonly RetryOptions _retryOptions;

public CertificateFailedInRegistryEventHandlerDefinition(IOptions<RetryOptions> options)
public RejectCertificateConsumerDefinition(IOptions<RetryOptions> options)
{
_retryOptions = options.Value;
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CertificateFailedInRegistryEventHandler> consumerConfigurator,
IConsumerConfigurator<RejectCertificateConsumer> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public record CertificateMarkedAsIssuedEvent
public required List<CertificateHashedAttribute> HashedAttributes { get; init; }
}

public class CertificateMarkedAsIssuedEventHandler : IConsumer<CertificateMarkedAsIssuedEvent>
public class SendToWalletConsumer : IConsumer<CertificateMarkedAsIssuedEvent>
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<CertificateMarkedAsIssuedEventHandler> _logger;
private readonly ILogger<SendToWalletConsumer> _logger;

public CertificateMarkedAsIssuedEventHandler(IHttpClientFactory httpClientFactory, IUnitOfWork unitOfWork, ILogger<CertificateMarkedAsIssuedEventHandler> logger)
public SendToWalletConsumer(IHttpClientFactory httpClientFactory, IUnitOfWork unitOfWork, ILogger<SendToWalletConsumer> logger)
{
_httpClientFactory = httpClientFactory;
_unitOfWork = unitOfWork;
Expand Down Expand Up @@ -88,17 +88,17 @@ public async Task Consume(ConsumeContext<CertificateMarkedAsIssuedEvent> context
}
}

public class CertificateMarkedAsIssuedEventHandlerDefinition : ConsumerDefinition<CertificateMarkedAsIssuedEventHandler>
public class SendToWalletConsumerDefinition : ConsumerDefinition<SendToWalletConsumer>
{
private readonly RetryOptions _retryOptions;

public CertificateMarkedAsIssuedEventHandlerDefinition(IOptions<RetryOptions> options)
public SendToWalletConsumerDefinition(IOptions<RetryOptions> options)
{
_retryOptions = options.Value;
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CertificateMarkedAsIssuedEventHandler> consumerConfigurator,
IConsumerConfigurator<SendToWalletConsumer> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public record CertificateSentToRegistryEvent
public required byte[] RandomR { get; init; }
}

public class CertificateSentToRegistryEventHandler : IConsumer<CertificateSentToRegistryEvent>
public class WaitForCommittedRegistryTransactionConsumer : IConsumer<CertificateSentToRegistryEvent>
{
private readonly ILogger<CertificateSentToRegistryEventHandler> _logger;
private readonly ILogger<WaitForCommittedRegistryTransactionConsumer> _logger;
private readonly RegistryOptions _registryOptions;

public CertificateSentToRegistryEventHandler(ILogger<CertificateSentToRegistryEventHandler> logger, IOptions<RegistryOptions> registryOptions)
public WaitForCommittedRegistryTransactionConsumer(ILogger<WaitForCommittedRegistryTransactionConsumer> logger, IOptions<RegistryOptions> registryOptions)
{
_logger = logger;
_registryOptions = registryOptions.Value;
Expand Down Expand Up @@ -93,17 +93,17 @@ public RegistryTransactionStillProcessingException(string message) : base(messag
}
}

public class CertificateSentToRegistryEventHandlerDefinition : ConsumerDefinition<CertificateSentToRegistryEventHandler>
public class WaitForCommittedRegistryTransactionConsumerDefinition : ConsumerDefinition<WaitForCommittedRegistryTransactionConsumer>
{
private readonly RetryOptions _retryOptions;

public CertificateSentToRegistryEventHandlerDefinition(IOptions<RetryOptions> options)
public WaitForCommittedRegistryTransactionConsumerDefinition(IOptions<RetryOptions> options)
{
_retryOptions = options.Value;
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CertificateSentToRegistryEventHandler> consumerConfigurator,
IConsumerConfigurator<WaitForCommittedRegistryTransactionConsumer> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task<ActionResult> IssueCertificate(
await unitOfWork.CertificateRepository.Create(certificate);
}

var payloadObj = new CertificateCreatedEvent
var payloadObj = new CertificateStoredEvent
{
CertificateId = certificate.Id,
CertificateType = certificate.CertificateType,
Expand All @@ -94,7 +94,7 @@ await unitOfWork.OutboxMessageRepository.Create(new OutboxMessage
Created = DateTimeOffset.Now.ToUtcTime(),
Id = Guid.NewGuid(),
JsonPayload = JsonSerializer.Serialize(payloadObj),
MessageType = typeof(CertificateCreatedEvent).ToString()
MessageType = typeof(CertificateStoredEvent).ToString()
});
unitOfWork.Commit();

Expand Down
10 changes: 5 additions & 5 deletions src/ProjectOrigin.Stamp.Server/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public void ConfigureServices(IServiceCollection services)
{
o.SetKebabCaseEndpointNameFormatter();

o.AddConsumer<CertificateCreatedEventHandler, CertificateCreatedEventHandlerDefinition>();
o.AddConsumer<CertificateFailedInRegistryEventHandler, CertificateFailedInRegistryEventHandlerDefinition>();
o.AddConsumer<CertificateIssuedInRegistryEventHandler, CertificateIssuedInRegistryEventHandlerDefinition>();
o.AddConsumer<CertificateMarkedAsIssuedEventHandler, CertificateMarkedAsIssuedEventHandlerDefinition>();
o.AddConsumer<CertificateSentToRegistryEventHandler, CertificateSentToRegistryEventHandlerDefinition>();
o.AddConsumer<IssueInRegistryConsumer, IssueInRegistryConsumerDefinition>();
o.AddConsumer<RejectCertificateConsumer, RejectCertificateConsumerDefinition>();
o.AddConsumer<MarkCertificateAsIssuedConsumer, MarkCertificateAsIssuedConsumerDefinition>();
o.AddConsumer<SendToWalletConsumer, SendToWalletConsumerDefinition>();
o.AddConsumer<WaitForCommittedRegistryTransactionConsumer, WaitForCommittedRegistryTransactionConsumerDefinition>();

o.ConfigureMassTransitTransport(_configuration.GetSection("MessageBroker").GetValid<MessageBrokerOptions>());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public OutboxPollingWorkerTests()
public async Task ShouldPublishAndDeleteMessages()
{
var privateKey = new Secp256k1Algorithm().GenerateNewPrivateKey();
var payloadObj = new CertificateCreatedEvent
var payloadObj = new CertificateStoredEvent
{
CertificateId = Guid.NewGuid(),
CertificateType = GranularCertificateType.Production,
Expand All @@ -59,7 +59,7 @@ public async Task ShouldPublishAndDeleteMessages()
{
Created = DateTimeOffset.Now.ToUtcTime(),
JsonPayload = JsonSerializer.Serialize(payloadObj),
MessageType = typeof(CertificateCreatedEvent).ToString(),
MessageType = typeof(CertificateStoredEvent).ToString(),
Id = Guid.NewGuid()
};
using var tokenSource = new CancellationTokenSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public OutboxMessageRepositoryTests(PostgresDatabaseFixture dbFixture)
public async Task CreateGetAndParseOutboxMessage()
{
var privateKey = new Secp256k1Algorithm().GenerateNewPrivateKey();
var payloadObj = new CertificateCreatedEvent
var payloadObj = new CertificateStoredEvent
{
CertificateId = Guid.NewGuid(),
CertificateType = GranularCertificateType.Production,
Expand All @@ -44,7 +44,7 @@ public async Task CreateGetAndParseOutboxMessage()
{
Created = DateTimeOffset.Now.ToUtcTime(),
JsonPayload = JsonSerializer.Serialize(payloadObj),
MessageType = typeof(CertificateCreatedEvent).ToString(),
MessageType = typeof(CertificateStoredEvent).ToString(),
Id = Guid.NewGuid()
};

Expand Down

0 comments on commit 80796af

Please sign in to comment.