From 7dc92e022c79182e90ba5c1f65bcda3c88f66568 Mon Sep 17 00:00:00 2001 From: Martin Schmidt Date: Thu, 5 Dec 2024 14:53:25 +0100 Subject: [PATCH] Added finalized state (#539) --- Registry.Dockerfile | 2 + chart/templates/deployment.yaml | 2 + chart/values.yaml | 3 + global.json | 2 +- protos/registry.proto | 1 + .../Process/BlockFinalizerJob.cs | 2 +- .../Grpc/RegistryService.cs | 13 ++- .../Options/RegistryOptions.cs | 2 + .../Repository/InMemory/InMemoryRepository.cs | 4 +- .../Repository/Models/TransactionStatus.cs | 3 +- .../Postgres/PostgresqlRepository.cs | 4 +- .../TransactionProcessorDispatcher.cs | 1 + .../RegistryServiceClientExtensions.cs | 2 +- .../ContainerTest.cs | 3 +- .../FlowTests.cs | 2 +- .../FlowTestsComitted.cs | 105 ++++++++++++++++++ .../Helper.cs | 10 ++ .../PerformanceTests.cs | 10 +- .../BlockFinalizerJobTests.cs | 2 +- .../AbstractTransactionStatusServiceTests.cs | 8 +- 20 files changed, 159 insertions(+), 22 deletions(-) create mode 100644 test/ProjectOrigin.Registry.IntegrationTests/FlowTestsComitted.cs diff --git a/Registry.Dockerfile b/Registry.Dockerfile index 231db785..adce4794 100644 --- a/Registry.Dockerfile +++ b/Registry.Dockerfile @@ -22,4 +22,6 @@ COPY --from=build /app/publish . EXPOSE 5000 EXPOSE 5001 +ENV ReturnComittedForFinalized=true + ENTRYPOINT ["dotnet", "App.dll"] diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index c69843b5..30bc3dde 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -68,6 +68,8 @@ spec: # General configuration - name: RegistryName value: {{ $.Values.registryName | default $.Release.Name }} + - name: ReturnComittedForFinalized + value: {{ $.Values.returnComittedForFinalized | quote }} # TransactionProcessor configuration - name: TransactionProcessor__ServerNumber diff --git a/chart/values.yaml b/chart/values.yaml index 733d2b99..1d702741 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -47,6 +47,9 @@ otlp: # Enables one to override the name of the registry registryName: +# The registry returns comitted for finalized transactions to enable backwards compatibility with older clients +returnComittedForFinalized: true + # transactionProcessor holds the configuration for the transaction processor transactionProcessor: # replicas defines the number of transaction processor containers to run diff --git a/global.json b/global.json index f7149747..8eaa52a8 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "8.0.402", + "version": "8.0.401", "rollForward": "feature" } } diff --git a/protos/registry.proto b/protos/registry.proto index 277e6e45..edbe5833 100644 --- a/protos/registry.proto +++ b/protos/registry.proto @@ -74,6 +74,7 @@ enum TransactionState { PENDING = 1; FAILED = 2; COMMITTED = 3; + FINALIZED = 4; } message GetTransactionProofRequest { diff --git a/src/ProjectOrigin.Registry/BlockFinalizer/Process/BlockFinalizerJob.cs b/src/ProjectOrigin.Registry/BlockFinalizer/Process/BlockFinalizerJob.cs index 63e59fc2..cca29dda 100644 --- a/src/ProjectOrigin.Registry/BlockFinalizer/Process/BlockFinalizerJob.cs +++ b/src/ProjectOrigin.Registry/BlockFinalizer/Process/BlockFinalizerJob.cs @@ -50,7 +50,7 @@ public async Task Execute(CancellationToken stoppingToken) foreach (var transactionHash in newBlock.TransactionHashes) { - await _statusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Committed)); + await _statusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Finalized)); } sw.Stop(); diff --git a/src/ProjectOrigin.Registry/Grpc/RegistryService.cs b/src/ProjectOrigin.Registry/Grpc/RegistryService.cs index ae0736a7..6e59cf1c 100644 --- a/src/ProjectOrigin.Registry/Grpc/RegistryService.cs +++ b/src/ProjectOrigin.Registry/Grpc/RegistryService.cs @@ -11,6 +11,8 @@ using ProjectOrigin.Registry.TransactionStatusCache; using ProjectOrigin.Registry.Repository; using ProjectOrigin.Registry.Repository.Models; +using Microsoft.Extensions.Options; +using ProjectOrigin.Registry.Options; namespace ProjectOrigin.Registry.Grpc; @@ -18,18 +20,20 @@ public class RegistryService : V1.RegistryService.RegistryServiceBase { public static readonly Meter Meter = new("Registry.RegistryService"); public static readonly Counter TransactionsSubmitted = Meter.CreateCounter("TransactionsSubmitted"); - + private readonly RegistryOptions _options; private readonly ITransactionRepository _transactionRepository; private readonly ITransactionStatusService _transactionStatusService; private readonly IRabbitMqChannelPool _brokerPool; private readonly IQueueResolver _queueResolver; public RegistryService( + IOptions options, ITransactionRepository transactionRepository, ITransactionStatusService transactionStatusService, IRabbitMqChannelPool brokerPool, IQueueResolver queueResolver) { + _options = options.Value; _transactionRepository = transactionRepository; _transactionStatusService = transactionStatusService; _brokerPool = brokerPool; @@ -64,9 +68,14 @@ public override async Task GetTransactionStatus(Ge { var transactionHash = new TransactionHash(Convert.FromBase64String(request.Id)); var state = await _transactionStatusService.GetTransactionStatus(transactionHash).ConfigureAwait(false); + + var returnState = _options.ReturnComittedForFinalized && state.NewStatus == TransactionStatus.Finalized + ? V1.TransactionState.Committed + : (V1.TransactionState)state.NewStatus; + return new GetTransactionStatusResponse { - Status = (V1.TransactionState)state.NewStatus, + Status = returnState, Message = state.Message, }; } diff --git a/src/ProjectOrigin.Registry/Options/RegistryOptions.cs b/src/ProjectOrigin.Registry/Options/RegistryOptions.cs index c9f46f67..350733d0 100644 --- a/src/ProjectOrigin.Registry/Options/RegistryOptions.cs +++ b/src/ProjectOrigin.Registry/Options/RegistryOptions.cs @@ -6,4 +6,6 @@ public record RegistryOptions() { [Required(AllowEmptyStrings = false)] public string RegistryName { get; init; } = string.Empty; + + public bool ReturnComittedForFinalized { get; init; } = false; } diff --git a/src/ProjectOrigin.Registry/Repository/InMemory/InMemoryRepository.cs b/src/ProjectOrigin.Registry/Repository/InMemory/InMemoryRepository.cs index 81cdc912..1b15a09c 100644 --- a/src/ProjectOrigin.Registry/Repository/InMemory/InMemoryRepository.cs +++ b/src/ProjectOrigin.Registry/Repository/InMemory/InMemoryRepository.cs @@ -166,9 +166,9 @@ public Task GetTransactionStatus(TransactionHash transactionH var block = _blocks.SingleOrDefault(x => x.FromTransaction <= index && index <= x.ToTransaction); if (block is not null && block.Publication is not null) - return Task.FromResult(TransactionStatus.Committed); + return Task.FromResult(TransactionStatus.Finalized); - return Task.FromResult(TransactionStatus.Pending); + return Task.FromResult(TransactionStatus.Committed); } } diff --git a/src/ProjectOrigin.Registry/Repository/Models/TransactionStatus.cs b/src/ProjectOrigin.Registry/Repository/Models/TransactionStatus.cs index 76a62cb7..20703f25 100644 --- a/src/ProjectOrigin.Registry/Repository/Models/TransactionStatus.cs +++ b/src/ProjectOrigin.Registry/Repository/Models/TransactionStatus.cs @@ -5,5 +5,6 @@ public enum TransactionStatus Unknown = 0, Pending = 1, Failed = 2, - Committed = 3 + Committed = 3, + Finalized = 4 } diff --git a/src/ProjectOrigin.Registry/Repository/Postgres/PostgresqlRepository.cs b/src/ProjectOrigin.Registry/Repository/Postgres/PostgresqlRepository.cs index 445bcedb..12cecc18 100644 --- a/src/ProjectOrigin.Registry/Repository/Postgres/PostgresqlRepository.cs +++ b/src/ProjectOrigin.Registry/Repository/Postgres/PostgresqlRepository.cs @@ -120,9 +120,9 @@ public async Task GetTransactionStatus(TransactionHash transa new { id }); if (hasBeenPublished.HasValue && hasBeenPublished.Value) - return TransactionStatus.Committed; + return TransactionStatus.Finalized; - return TransactionStatus.Pending; + return TransactionStatus.Committed; } public async Task CreateNextBlock() diff --git a/src/ProjectOrigin.Registry/TransactionProcessor/TransactionProcessorDispatcher.cs b/src/ProjectOrigin.Registry/TransactionProcessor/TransactionProcessorDispatcher.cs index 966a47bb..3b58a638 100644 --- a/src/ProjectOrigin.Registry/TransactionProcessor/TransactionProcessorDispatcher.cs +++ b/src/ProjectOrigin.Registry/TransactionProcessor/TransactionProcessorDispatcher.cs @@ -57,6 +57,7 @@ public async Task Verify(V1.Transaction transaction) var nextEventIndex = stream.Count; var verifiableEvent = new StreamTransaction { TransactionHash = transactionHash, StreamId = streamId, StreamIndex = nextEventIndex, Payload = transaction.ToByteArray() }; await _transactionRepository.Store(verifiableEvent).ConfigureAwait(false); + await _transactionStatusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Committed)); _logger.LogDebug("Transaction processed {transactionHash}", transactionHash); } diff --git a/test/ProjectOrigin.Registry.ChartTests/Extensions/RegistryServiceClientExtensions.cs b/test/ProjectOrigin.Registry.ChartTests/Extensions/RegistryServiceClientExtensions.cs index d3086c91..25ebee3c 100644 --- a/test/ProjectOrigin.Registry.ChartTests/Extensions/RegistryServiceClientExtensions.cs +++ b/test/ProjectOrigin.Registry.ChartTests/Extensions/RegistryServiceClientExtensions.cs @@ -33,7 +33,7 @@ private static async Task WaitForCommittedOrTimeou { var result = await getTransactionStatus(); - if (result.Status == TransactionState.Committed) + if (result.Status == TransactionState.Committed || result.Status == TransactionState.Finalized) return result; else if (result.Status == TransactionState.Failed) Assert.Fail($"Transaction failed ”{result.Status}” with message ”{result.Message}”"); diff --git a/test/ProjectOrigin.Registry.IntegrationTests/ContainerTest.cs b/test/ProjectOrigin.Registry.IntegrationTests/ContainerTest.cs index 706855e3..24a1e339 100644 --- a/test/ProjectOrigin.Registry.IntegrationTests/ContainerTest.cs +++ b/test/ProjectOrigin.Registry.IntegrationTests/ContainerTest.cs @@ -64,6 +64,7 @@ public ContainerTest( .WithPortBinding(GrpcPort, true) .WithCommand("--serve") .WithEnvironment("RegistryName", RegistryName) + .WithEnvironment("ReturnComittedForFinalized", "false") .WithEnvironment("Verifiers__project_origin.electricity.v1", verifierUrl) .WithEnvironment("ImmutableLog__type", "log") .WithEnvironment("BlockFinalizer__Interval", "00:00:05") @@ -147,7 +148,7 @@ public async Task issue_comsumption_certificate_success() status = await Helper.RepeatUntilOrTimeout( () => Client.GetStatus(transaction), - result => result.Status == Registry.V1.TransactionState.Committed, + result => result.Status == Registry.V1.TransactionState.Finalized, TimeSpan.FromSeconds(60)); status.Message.Should().BeEmpty(); diff --git a/test/ProjectOrigin.Registry.IntegrationTests/FlowTests.cs b/test/ProjectOrigin.Registry.IntegrationTests/FlowTests.cs index 58fb52c1..2decec12 100644 --- a/test/ProjectOrigin.Registry.IntegrationTests/FlowTests.cs +++ b/test/ProjectOrigin.Registry.IntegrationTests/FlowTests.cs @@ -82,7 +82,7 @@ public async Task issue_comsumption_certificate_success() status = await Helper.RepeatUntilOrTimeout( () => Client.GetStatus(transaction), - result => result.Status == Registry.V1.TransactionState.Committed, + result => result.Status == Registry.V1.TransactionState.Finalized, TimeSpan.FromSeconds(60)); status.Message.Should().BeEmpty(); diff --git a/test/ProjectOrigin.Registry.IntegrationTests/FlowTestsComitted.cs b/test/ProjectOrigin.Registry.IntegrationTests/FlowTestsComitted.cs new file mode 100644 index 00000000..3677f9c3 --- /dev/null +++ b/test/ProjectOrigin.Registry.IntegrationTests/FlowTestsComitted.cs @@ -0,0 +1,105 @@ +using Xunit.Abstractions; +using Xunit; +using System.Threading.Tasks; +using System; +using ProjectOrigin.PedersenCommitment; +using FluentAssertions; +using ProjectOrigin.Electricity.V1; +using ProjectOrigin.HierarchicalDeterministicKeys; +using System.Collections.Generic; +using ProjectOrigin.TestCommon.Fixtures; +using ProjectOrigin.Registry; +using ProjectOrigin.Registry.IntegrationTests.Fixtures; + +namespace ProjectOrigin.Electricity.IntegrationTests; + +public class FlowTestsComitted : + IClassFixture>, + IClassFixture, + IClassFixture>, + IClassFixture, + IClassFixture +{ + protected const string RegistryName = "SomeRegistry"; + protected readonly ElectricityServiceFixture _verifierFixture; + private readonly PostgresDatabaseFixture _postgresDatabaseFixture; + + private readonly Lazy _client; + protected Registry.V1.RegistryService.RegistryServiceClient Client => _client.Value; + + public FlowTestsComitted( + ElectricityServiceFixture verifierFixture, + TestServerFixture serverFixture, + PostgresDatabaseFixture postgresDatabaseFixture, + RedisFixture redisFixture, + RabbitMqFixture rabbitMqFixture, + ITestOutputHelper outputHelper) + { + _verifierFixture = verifierFixture; + _postgresDatabaseFixture = postgresDatabaseFixture; + _client = new(() => new Registry.V1.RegistryService.RegistryServiceClient(serverFixture.Channel)); + serverFixture.ConfigureHostConfiguration(new Dictionary() + { + {"Otlp:Enabled", "false"}, + {"RegistryName", RegistryName}, + {"ReturnComittedForFinalized", "true"}, + {"Verifiers:project_origin.electricity.v1", _verifierFixture.Url}, + {"ImmutableLog:type", "log"}, + {"BlockFinalizer:Interval", "00:00:05"}, + {"Persistence:type", "postgresql"}, + {"ConnectionStrings:Database", _postgresDatabaseFixture.HostConnectionString}, + {"Cache:Type", "redis"}, + {"Cache:Redis:ConnectionString", redisFixture.HostConnectionString}, + {"RabbitMq:Hostname", rabbitMqFixture.Hostname}, + {"RabbitMq:AmqpPort", rabbitMqFixture.AmqpPort.ToString()}, + {"RabbitMq:HttpApiPort", rabbitMqFixture.HttpApiPort.ToString()}, + {"RabbitMq:Username", RabbitMqFixture.Username}, + {"RabbitMq:Password", RabbitMqFixture.Password}, + {"TransactionProcessor:ServerNumber", "0"}, + {"TransactionProcessor:Servers", "1"}, + {"TransactionProcessor:Threads", "5"}, + {"TransactionProcessor:Weight", "10"}, + }); + } + + [Fact] + public async Task issue_comsumption_certificate_success() + { + var owner = Algorithms.Secp256k1.GenerateNewPrivateKey(); + + var commitmentInfo = new SecretCommitmentInfo(250); + var certId = Guid.NewGuid(); + + IssuedEvent @event = Helper.CreateIssuedEvent(RegistryName, _verifierFixture.IssuerArea, owner.PublicKey, commitmentInfo, certId); + + var transaction = Helper.SignTransaction(@event.CertificateId, @event, _verifierFixture.IssuerKey); + + var status = await Client.GetStatus(transaction); + status.Status.Should().Be(Registry.V1.TransactionState.Unknown); + + await Client.SendTransactions(transaction); + status = await Client.GetStatus(transaction); + status.Status.Should().Be(Registry.V1.TransactionState.Pending); + + status = await Helper.RepeatUntilOrTimeout( + () => Client.GetStatus(transaction), + result => result.Status == Registry.V1.TransactionState.Committed, + TimeSpan.FromSeconds(60)); + + status.Message.Should().BeEmpty(); + + var stream = await Client.GetStream(certId); + stream.Transactions.Should().HaveCount(1); + + await Task.Delay(10000); + + var blocks = await Client.GetBlocksAsync(new Registry.V1.GetBlocksRequest + { + Skip = 0, + Limit = 1, + IncludeTransactions = false + }); + + blocks.Blocks.Should().HaveCount(1); + } +} diff --git a/test/ProjectOrigin.Registry.IntegrationTests/Helper.cs b/test/ProjectOrigin.Registry.IntegrationTests/Helper.cs index ea60ac84..39131c1e 100644 --- a/test/ProjectOrigin.Registry.IntegrationTests/Helper.cs +++ b/test/ProjectOrigin.Registry.IntegrationTests/Helper.cs @@ -86,6 +86,16 @@ public static async Task SendTransactions(this Registry.V1.RegistryService.Regis }); } + public static async Task GetBlock(this Registry.V1.RegistryService.RegistryServiceClient client) + { + return await client.GetBlocksAsync(new Registry.V1.GetBlocksRequest + { + Skip = 0, + Limit = 1, + IncludeTransactions = false + }); + } + public static Registry.V1.Transaction SignTransaction(Common.V1.FederatedStreamId streamId, IMessage @event, IPrivateKey signerKey) { var header = new Registry.V1.TransactionHeader() diff --git a/test/ProjectOrigin.Registry.IntegrationTests/PerformanceTests.cs b/test/ProjectOrigin.Registry.IntegrationTests/PerformanceTests.cs index 4643f356..593f52fe 100644 --- a/test/ProjectOrigin.Registry.IntegrationTests/PerformanceTests.cs +++ b/test/ProjectOrigin.Registry.IntegrationTests/PerformanceTests.cs @@ -128,7 +128,7 @@ public async Task TestThroughput() { if (queued.TryDequeue(out var transaction)) { - if (await IsCommitted(channel, transaction)) + if (await IsFinalized(channel, transaction)) { completed.Add(transaction); } @@ -170,7 +170,7 @@ public async Task TestLatencySequential() { stopwatch.Restart(); var transaction = await SendRequest(channel); - while (!await IsCommitted(channel, transaction)) + while (!await IsFinalized(channel, transaction)) { await Task.Delay(25); } @@ -213,7 +213,7 @@ public async Task TestLatencyParallel() while (queued.TryDequeue(out var item)) { - if (await IsCommitted(channel, item.transaction)) + if (await IsFinalized(channel, item.transaction)) { item.stopwatch.Stop(); measurements.Add(item.stopwatch.ElapsedMilliseconds); @@ -279,11 +279,11 @@ private async Task WriteRegistryContainerLog() _outputHelper.WriteLine($"-------Container stdout------\n{log.Stdout}\n-------Container stderr------\n{log.Stderr}\n\n----------"); } - private static async Task IsCommitted(GrpcChannel channel, Registry.V1.Transaction transaction) + private static async Task IsFinalized(GrpcChannel channel, Registry.V1.Transaction transaction) { var client = new Registry.V1.RegistryService.RegistryServiceClient(channel); var result = await client.GetStatus(transaction); - return result.Status == Registry.V1.TransactionState.Committed; + return result.Status == Registry.V1.TransactionState.Finalized; } } diff --git a/test/ProjectOrigin.Registry.Tests/BlockFinalizerJobTests.cs b/test/ProjectOrigin.Registry.Tests/BlockFinalizerJobTests.cs index e2480ea7..9cacd84e 100644 --- a/test/ProjectOrigin.Registry.Tests/BlockFinalizerJobTests.cs +++ b/test/ProjectOrigin.Registry.Tests/BlockFinalizerJobTests.cs @@ -68,7 +68,7 @@ public async Task Success() _repository.Verify(obj => obj.CreateNextBlock(), Times.Exactly(1)); _blockPublisher.Verify(obj => obj.PublishBlock(It.Is(x => x == header)), Times.Exactly(1)); _repository.Verify(obj => obj.FinalizeBlock(It.Is(x => x.Equals(BlockHash.FromHeader(header))), It.Is(x => x == publication)), Times.Exactly(1)); - _statusService.Verify(obj => obj.SetTransactionStatus(It.IsAny(), It.Is(x => x.NewStatus == TransactionStatus.Committed)), Times.Exactly(transactions.Count)); + _statusService.Verify(obj => obj.SetTransactionStatus(It.IsAny(), It.Is(x => x.NewStatus == TransactionStatus.Finalized)), Times.Exactly(transactions.Count)); } [Fact] diff --git a/test/ProjectOrigin.Registry.Tests/TransactionStatusCache/AbstractTransactionStatusServiceTests.cs b/test/ProjectOrigin.Registry.Tests/TransactionStatusCache/AbstractTransactionStatusServiceTests.cs index 7c653395..491b0597 100644 --- a/test/ProjectOrigin.Registry.Tests/TransactionStatusCache/AbstractTransactionStatusServiceTests.cs +++ b/test/ProjectOrigin.Registry.Tests/TransactionStatusCache/AbstractTransactionStatusServiceTests.cs @@ -68,7 +68,7 @@ public async Task ShouldNotSetLowerStatus() } [Fact] - public async Task ShouldReturnPendingRecordFromRepositoryNotInBlock() + public async Task ShouldReturnCommittedRecordFromRepositoryNotInBlock() { // Arrange var data = _fixture.Create(); @@ -85,11 +85,11 @@ await _repository.Store(new StreamTransaction var record = await Service.GetTransactionStatus(transactionHash); // Assert - record.NewStatus.Should().Be(TransactionStatus.Pending); + record.NewStatus.Should().Be(TransactionStatus.Committed); } [Fact] - public async Task ShouldReturnCommittedRecordFromRepositoryInBlock() + public async Task ShouldReturnFinalizedRecordFromRepositoryInBlock() { // Arrange var data = _fixture.Create(); @@ -109,6 +109,6 @@ await _repository.Store(new StreamTransaction var record = await Service.GetTransactionStatus(transactionHash); // Assert - record.NewStatus.Should().Be(TransactionStatus.Committed); + record.NewStatus.Should().Be(TransactionStatus.Finalized); } }