Skip to content

Commit

Permalink
Added finalized state (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinSchmidt authored Dec 5, 2024
1 parent f9952fb commit 7dc92e0
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Registry.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ COPY --from=build /app/publish .
EXPOSE 5000
EXPOSE 5001

ENV ReturnComittedForFinalized=true

ENTRYPOINT ["dotnet", "App.dll"]
2 changes: 2 additions & 0 deletions chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ spec:
# General configuration
- name: RegistryName
value: {{ $.Values.registryName | default $.Release.Name }}
- name: ReturnComittedForFinalized
value: {{ $.Values.returnComittedForFinalized | quote }}

# TransactionProcessor configuration
- name: TransactionProcessor__ServerNumber
Expand Down
3 changes: 3 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ otlp:
# Enables one to override the name of the registry
registryName:

# The registry returns comitted for finalized transactions to enable backwards compatibility with older clients
returnComittedForFinalized: true

# transactionProcessor holds the configuration for the transaction processor
transactionProcessor:
# replicas defines the number of transaction processor containers to run
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "8.0.402",
"version": "8.0.401",
"rollForward": "feature"
}
}
1 change: 1 addition & 0 deletions protos/registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum TransactionState {
PENDING = 1;
FAILED = 2;
COMMITTED = 3;
FINALIZED = 4;
}

message GetTransactionProofRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task Execute(CancellationToken stoppingToken)

foreach (var transactionHash in newBlock.TransactionHashes)
{
await _statusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Committed));
await _statusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Finalized));
}

sw.Stop();
Expand Down
13 changes: 11 additions & 2 deletions src/ProjectOrigin.Registry/Grpc/RegistryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,29 @@
using ProjectOrigin.Registry.TransactionStatusCache;
using ProjectOrigin.Registry.Repository;
using ProjectOrigin.Registry.Repository.Models;
using Microsoft.Extensions.Options;
using ProjectOrigin.Registry.Options;

namespace ProjectOrigin.Registry.Grpc;

public class RegistryService : V1.RegistryService.RegistryServiceBase
{
public static readonly Meter Meter = new("Registry.RegistryService");
public static readonly Counter<long> TransactionsSubmitted = Meter.CreateCounter<long>("TransactionsSubmitted");

private readonly RegistryOptions _options;
private readonly ITransactionRepository _transactionRepository;
private readonly ITransactionStatusService _transactionStatusService;
private readonly IRabbitMqChannelPool _brokerPool;
private readonly IQueueResolver _queueResolver;

public RegistryService(
IOptions<RegistryOptions> options,
ITransactionRepository transactionRepository,
ITransactionStatusService transactionStatusService,
IRabbitMqChannelPool brokerPool,
IQueueResolver queueResolver)
{
_options = options.Value;
_transactionRepository = transactionRepository;
_transactionStatusService = transactionStatusService;
_brokerPool = brokerPool;
Expand Down Expand Up @@ -64,9 +68,14 @@ public override async Task<GetTransactionStatusResponse> GetTransactionStatus(Ge
{
var transactionHash = new TransactionHash(Convert.FromBase64String(request.Id));
var state = await _transactionStatusService.GetTransactionStatus(transactionHash).ConfigureAwait(false);

var returnState = _options.ReturnComittedForFinalized && state.NewStatus == TransactionStatus.Finalized
? V1.TransactionState.Committed
: (V1.TransactionState)state.NewStatus;

return new GetTransactionStatusResponse
{
Status = (V1.TransactionState)state.NewStatus,
Status = returnState,
Message = state.Message,
};
}
Expand Down
2 changes: 2 additions & 0 deletions src/ProjectOrigin.Registry/Options/RegistryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ public record RegistryOptions()
{
[Required(AllowEmptyStrings = false)]
public string RegistryName { get; init; } = string.Empty;

public bool ReturnComittedForFinalized { get; init; } = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ public Task<TransactionStatus> GetTransactionStatus(TransactionHash transactionH
var block = _blocks.SingleOrDefault(x => x.FromTransaction <= index && index <= x.ToTransaction);

if (block is not null && block.Publication is not null)
return Task.FromResult(TransactionStatus.Committed);
return Task.FromResult(TransactionStatus.Finalized);

return Task.FromResult(TransactionStatus.Pending);
return Task.FromResult(TransactionStatus.Committed);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public enum TransactionStatus
Unknown = 0,
Pending = 1,
Failed = 2,
Committed = 3
Committed = 3,
Finalized = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public async Task<TransactionStatus> GetTransactionStatus(TransactionHash transa
new { id });

if (hasBeenPublished.HasValue && hasBeenPublished.Value)
return TransactionStatus.Committed;
return TransactionStatus.Finalized;

return TransactionStatus.Pending;
return TransactionStatus.Committed;
}

public async Task<NewBlock?> CreateNextBlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public async Task Verify(V1.Transaction transaction)
var nextEventIndex = stream.Count;
var verifiableEvent = new StreamTransaction { TransactionHash = transactionHash, StreamId = streamId, StreamIndex = nextEventIndex, Payload = transaction.ToByteArray() };
await _transactionRepository.Store(verifiableEvent).ConfigureAwait(false);
await _transactionStatusService.SetTransactionStatus(transactionHash, new TransactionStatusRecord(TransactionStatus.Committed));

_logger.LogDebug("Transaction processed {transactionHash}", transactionHash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private static async Task<GetTransactionStatusResponse> WaitForCommittedOrTimeou
{
var result = await getTransactionStatus();

if (result.Status == TransactionState.Committed)
if (result.Status == TransactionState.Committed || result.Status == TransactionState.Finalized)
return result;
else if (result.Status == TransactionState.Failed)
Assert.Fail($"Transaction failed ”{result.Status}” with message ”{result.Message}”");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public ContainerTest(
.WithPortBinding(GrpcPort, true)
.WithCommand("--serve")
.WithEnvironment("RegistryName", RegistryName)
.WithEnvironment("ReturnComittedForFinalized", "false")
.WithEnvironment("Verifiers__project_origin.electricity.v1", verifierUrl)
.WithEnvironment("ImmutableLog__type", "log")
.WithEnvironment("BlockFinalizer__Interval", "00:00:05")
Expand Down Expand Up @@ -147,7 +148,7 @@ public async Task issue_comsumption_certificate_success()

status = await Helper.RepeatUntilOrTimeout(
() => Client.GetStatus(transaction),
result => result.Status == Registry.V1.TransactionState.Committed,
result => result.Status == Registry.V1.TransactionState.Finalized,
TimeSpan.FromSeconds(60));

status.Message.Should().BeEmpty();
Expand Down
2 changes: 1 addition & 1 deletion test/ProjectOrigin.Registry.IntegrationTests/FlowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task issue_comsumption_certificate_success()

status = await Helper.RepeatUntilOrTimeout(
() => Client.GetStatus(transaction),
result => result.Status == Registry.V1.TransactionState.Committed,
result => result.Status == Registry.V1.TransactionState.Finalized,
TimeSpan.FromSeconds(60));

status.Message.Should().BeEmpty();
Expand Down
105 changes: 105 additions & 0 deletions test/ProjectOrigin.Registry.IntegrationTests/FlowTestsComitted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using Xunit.Abstractions;
using Xunit;
using System.Threading.Tasks;
using System;
using ProjectOrigin.PedersenCommitment;
using FluentAssertions;
using ProjectOrigin.Electricity.V1;
using ProjectOrigin.HierarchicalDeterministicKeys;
using System.Collections.Generic;
using ProjectOrigin.TestCommon.Fixtures;
using ProjectOrigin.Registry;
using ProjectOrigin.Registry.IntegrationTests.Fixtures;

namespace ProjectOrigin.Electricity.IntegrationTests;

public class FlowTestsComitted :
IClassFixture<TestServerFixture<Startup>>,
IClassFixture<ElectricityServiceFixture>,
IClassFixture<PostgresDatabaseFixture<Startup>>,
IClassFixture<RedisFixture>,
IClassFixture<RabbitMqFixture>
{
protected const string RegistryName = "SomeRegistry";
protected readonly ElectricityServiceFixture _verifierFixture;
private readonly PostgresDatabaseFixture<Startup> _postgresDatabaseFixture;

private readonly Lazy<Registry.V1.RegistryService.RegistryServiceClient> _client;
protected Registry.V1.RegistryService.RegistryServiceClient Client => _client.Value;

public FlowTestsComitted(
ElectricityServiceFixture verifierFixture,
TestServerFixture<Startup> serverFixture,
PostgresDatabaseFixture<Startup> postgresDatabaseFixture,
RedisFixture redisFixture,
RabbitMqFixture rabbitMqFixture,
ITestOutputHelper outputHelper)
{
_verifierFixture = verifierFixture;
_postgresDatabaseFixture = postgresDatabaseFixture;
_client = new(() => new Registry.V1.RegistryService.RegistryServiceClient(serverFixture.Channel));
serverFixture.ConfigureHostConfiguration(new Dictionary<string, string?>()
{
{"Otlp:Enabled", "false"},
{"RegistryName", RegistryName},
{"ReturnComittedForFinalized", "true"},
{"Verifiers:project_origin.electricity.v1", _verifierFixture.Url},
{"ImmutableLog:type", "log"},
{"BlockFinalizer:Interval", "00:00:05"},
{"Persistence:type", "postgresql"},
{"ConnectionStrings:Database", _postgresDatabaseFixture.HostConnectionString},
{"Cache:Type", "redis"},
{"Cache:Redis:ConnectionString", redisFixture.HostConnectionString},
{"RabbitMq:Hostname", rabbitMqFixture.Hostname},
{"RabbitMq:AmqpPort", rabbitMqFixture.AmqpPort.ToString()},
{"RabbitMq:HttpApiPort", rabbitMqFixture.HttpApiPort.ToString()},
{"RabbitMq:Username", RabbitMqFixture.Username},
{"RabbitMq:Password", RabbitMqFixture.Password},
{"TransactionProcessor:ServerNumber", "0"},
{"TransactionProcessor:Servers", "1"},
{"TransactionProcessor:Threads", "5"},
{"TransactionProcessor:Weight", "10"},
});
}

[Fact]
public async Task issue_comsumption_certificate_success()
{
var owner = Algorithms.Secp256k1.GenerateNewPrivateKey();

var commitmentInfo = new SecretCommitmentInfo(250);
var certId = Guid.NewGuid();

IssuedEvent @event = Helper.CreateIssuedEvent(RegistryName, _verifierFixture.IssuerArea, owner.PublicKey, commitmentInfo, certId);

var transaction = Helper.SignTransaction(@event.CertificateId, @event, _verifierFixture.IssuerKey);

var status = await Client.GetStatus(transaction);
status.Status.Should().Be(Registry.V1.TransactionState.Unknown);

await Client.SendTransactions(transaction);
status = await Client.GetStatus(transaction);
status.Status.Should().Be(Registry.V1.TransactionState.Pending);

status = await Helper.RepeatUntilOrTimeout(
() => Client.GetStatus(transaction),
result => result.Status == Registry.V1.TransactionState.Committed,
TimeSpan.FromSeconds(60));

status.Message.Should().BeEmpty();

var stream = await Client.GetStream(certId);
stream.Transactions.Should().HaveCount(1);

await Task.Delay(10000);

var blocks = await Client.GetBlocksAsync(new Registry.V1.GetBlocksRequest
{
Skip = 0,
Limit = 1,
IncludeTransactions = false
});

blocks.Blocks.Should().HaveCount(1);
}
}
10 changes: 10 additions & 0 deletions test/ProjectOrigin.Registry.IntegrationTests/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ public static async Task SendTransactions(this Registry.V1.RegistryService.Regis
});
}

public static async Task<Registry.V1.GetBlocksResponse> GetBlock(this Registry.V1.RegistryService.RegistryServiceClient client)
{
return await client.GetBlocksAsync(new Registry.V1.GetBlocksRequest
{
Skip = 0,
Limit = 1,
IncludeTransactions = false
});
}

public static Registry.V1.Transaction SignTransaction(Common.V1.FederatedStreamId streamId, IMessage @event, IPrivateKey signerKey)
{
var header = new Registry.V1.TransactionHeader()
Expand Down
10 changes: 5 additions & 5 deletions test/ProjectOrigin.Registry.IntegrationTests/PerformanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public async Task TestThroughput()
{
if (queued.TryDequeue(out var transaction))
{
if (await IsCommitted(channel, transaction))
if (await IsFinalized(channel, transaction))
{
completed.Add(transaction);
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public async Task TestLatencySequential()
{
stopwatch.Restart();
var transaction = await SendRequest(channel);
while (!await IsCommitted(channel, transaction))
while (!await IsFinalized(channel, transaction))
{
await Task.Delay(25);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ public async Task TestLatencyParallel()

while (queued.TryDequeue(out var item))
{
if (await IsCommitted(channel, item.transaction))
if (await IsFinalized(channel, item.transaction))
{
item.stopwatch.Stop();
measurements.Add(item.stopwatch.ElapsedMilliseconds);
Expand Down Expand Up @@ -279,11 +279,11 @@ private async Task WriteRegistryContainerLog()
_outputHelper.WriteLine($"-------Container stdout------\n{log.Stdout}\n-------Container stderr------\n{log.Stderr}\n\n----------");
}

private static async Task<bool> IsCommitted(GrpcChannel channel, Registry.V1.Transaction transaction)
private static async Task<bool> IsFinalized(GrpcChannel channel, Registry.V1.Transaction transaction)
{
var client = new Registry.V1.RegistryService.RegistryServiceClient(channel);
var result = await client.GetStatus(transaction);
return result.Status == Registry.V1.TransactionState.Committed;
return result.Status == Registry.V1.TransactionState.Finalized;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task Success()
_repository.Verify(obj => obj.CreateNextBlock(), Times.Exactly(1));
_blockPublisher.Verify(obj => obj.PublishBlock(It.Is<Registry.V1.BlockHeader>(x => x == header)), Times.Exactly(1));
_repository.Verify(obj => obj.FinalizeBlock(It.Is<BlockHash>(x => x.Equals(BlockHash.FromHeader(header))), It.Is<Registry.V1.BlockPublication>(x => x == publication)), Times.Exactly(1));
_statusService.Verify(obj => obj.SetTransactionStatus(It.IsAny<TransactionHash>(), It.Is<TransactionStatusRecord>(x => x.NewStatus == TransactionStatus.Committed)), Times.Exactly(transactions.Count));
_statusService.Verify(obj => obj.SetTransactionStatus(It.IsAny<TransactionHash>(), It.Is<TransactionStatusRecord>(x => x.NewStatus == TransactionStatus.Finalized)), Times.Exactly(transactions.Count));
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task ShouldNotSetLowerStatus()
}

[Fact]
public async Task ShouldReturnPendingRecordFromRepositoryNotInBlock()
public async Task ShouldReturnCommittedRecordFromRepositoryNotInBlock()
{
// Arrange
var data = _fixture.Create<byte[]>();
Expand All @@ -85,11 +85,11 @@ await _repository.Store(new StreamTransaction
var record = await Service.GetTransactionStatus(transactionHash);

// Assert
record.NewStatus.Should().Be(TransactionStatus.Pending);
record.NewStatus.Should().Be(TransactionStatus.Committed);
}

[Fact]
public async Task ShouldReturnCommittedRecordFromRepositoryInBlock()
public async Task ShouldReturnFinalizedRecordFromRepositoryInBlock()
{
// Arrange
var data = _fixture.Create<byte[]>();
Expand All @@ -109,6 +109,6 @@ await _repository.Store(new StreamTransaction
var record = await Service.GetTransactionStatus(transactionHash);

// Assert
record.NewStatus.Should().Be(TransactionStatus.Committed);
record.NewStatus.Should().Be(TransactionStatus.Finalized);
}
}

0 comments on commit 7dc92e0

Please sign in to comment.