Skip to content

Commit

Permalink
update to 8.0 for e2e tests (#592)
Browse files Browse the repository at this point in the history
* update to 8.0 for e2e tests

* Fixes for BuildSummary including:
* Use Protobuf.System.Text.Json to properly format dictionaries: https://stackoverflow.com/questions/66300807/protobuf-map-unserialization-issue
  * Update outbox handling to deal with the new Json Options
* FIx E2E test properly getting build info after build is complete
  • Loading branch information
johnml1135 authored Jan 6, 2025
1 parent a818da9 commit b1063c6
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 6.0.x
dotnet-version: 8.0.x

- name: Start containers
run: dotnet build && docker compose -f "docker-compose.yml" up -d && sleep 20 #allow time for mongo to start up properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,14 @@ public class MessageOutboxOptions

public string OutboxDir { get; set; } = "outbox";
public TimeSpan MessageExpirationTimeout { get; set; } = TimeSpan.FromHours(48);

public static JsonSerializerOptions JsonSerializerOptions
{
get
{
JsonSerializerOptions options = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
options.AddProtobufSupport();
return options;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<PackageReference Include="Hangfire.Mongo" Version="1.10.8" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="8.0.8" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.8" />
<PackageReference Include="Protobuf.System.Text.Json" Version="1.4.0" />
<PackageReference Include="SIL.Machine" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<PackageReference Include="SIL.Machine.Morphology.HermitCrab" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<PackageReference Include="SIL.Machine.Translation.Thot" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync(
public Task<string> EnqueueMessageAsync<TValue>(
string outboxId,
string method,
string groupId,
string? content = null,
Stream? contentStream = null,
TValue content,
CancellationToken cancellationToken = default
);

public Task<string> EnqueueMessageStreamAsync(
string outboxId,
string method,
string groupId,
Stream contentStream,
CancellationToken cancellationToken = default
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public interface IOutboxMessageHandler
public string OutboxId { get; }

public Task HandleMessageAsync(
string groupId,
string method,
string? content,
Stream? contentStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ internal async Task ProcessMessagesAsync(
{
try
{
await ProcessGroupMessagesAsync(messages, message, outboxMessageHandler, cancellationToken);
await ProcessGroupMessagesAsync(
messages,
message,
messageGroup.Key.GroupId,
outboxMessageHandler,
cancellationToken
);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -98,6 +104,7 @@ internal async Task ProcessMessagesAsync(
private async Task ProcessGroupMessagesAsync(
IRepository<OutboxMessage> messages,
OutboxMessage message,
string groupId,
IOutboxMessageHandler outboxMessageHandler,
CancellationToken cancellationToken = default
)
Expand All @@ -109,6 +116,7 @@ private async Task ProcessGroupMessagesAsync(
try
{
await outboxMessageHandler.HandleMessageAsync(
groupId,
message.Method,
message.Content,
contentStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ IOptionsMonitor<MessageOutboxOptions> options
private readonly IOptionsMonitor<MessageOutboxOptions> _options = options;
internal int MaxDocumentSize { get; set; } = 1_000_000;

public async Task<string> EnqueueMessageAsync(
public async Task<string> EnqueueMessageAsync<TValue>(
string outboxId,
string method,
string groupId,
string? content = null,
Stream? contentStream = null,
TValue content,
CancellationToken cancellationToken = default
)
{
if (content == null && contentStream == null)
throw new ArgumentException("Either content or contentStream must be specified.");
if (content is not null && content.Length > MaxDocumentSize)
string serializedContent = JsonSerializer.Serialize(content, MessageOutboxOptions.JsonSerializerOptions);
if (serializedContent.Length > MaxDocumentSize)
{
throw new ArgumentException(
$"The content is too large for request {method} with group ID {groupId}. "
+ $"It is {content.Length} bytes, but the maximum is {MaxDocumentSize} bytes."
+ $"It is {serializedContent.Length} bytes, but the maximum is {MaxDocumentSize} bytes."
);
}
Outbox outbox = (
Expand All @@ -49,24 +47,52 @@ await _outboxes.UpdateAsync(
OutboxRef = outboxId,
Method = method,
GroupId = groupId,
Content = content,
HasContentStream = contentStream is not null
Content = serializedContent,
HasContentStream = false
};
string filePath = Path.Combine(_options.CurrentValue.OutboxDir, outboxMessage.Id);
await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken);
return outboxMessage.Id;
}

public async Task<string> EnqueueMessageStreamAsync(
string outboxId,
string method,
string groupId,
Stream contentStream,
CancellationToken cancellationToken = default
)
{
Outbox outbox = (
await _outboxes.UpdateAsync(
outboxId,
u => u.Inc(o => o.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
OutboxMessage outboxMessage =
new()
{
Id = _idGenerator.GenerateId(),
Index = outbox.CurrentIndex,
OutboxRef = outboxId,
Method = method,
GroupId = groupId,
Content = null,
HasContentStream = true
};
string filePath = Path.Combine(_options.CurrentValue.OutboxDir, outboxMessage.Id);
try
{
if (contentStream is not null)
{
await using Stream fileStream = _fileSystem.OpenWrite(filePath);
await contentStream.CopyToAsync(fileStream, cancellationToken);
}
await using Stream fileStream = _fileSystem.OpenWrite(filePath);
await contentStream.CopyToAsync(fileStream, cancellationToken);
await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken);
return outboxMessage.Id;
}
catch
{
if (contentStream is not null)
_fileSystem.DeleteFile(filePath);
_fileSystem.DeleteFile(filePath);
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ public class ServalPlatformOutboxMessageHandler(TranslationPlatformApi.Translati
: IOutboxMessageHandler
{
private readonly TranslationPlatformApi.TranslationPlatformApiClient _client = client;
private static readonly JsonSerializerOptions JsonSerializerOptions =
new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
private readonly JsonSerializerOptions _jsonSerializerOptions = MessageOutboxOptions.JsonSerializerOptions;

public string OutboxId => ServalPlatformOutboxConstants.OutboxId;

public async Task HandleMessageAsync(
string groupId,
string method,
string? content,
Stream? contentStream,
Expand All @@ -22,39 +22,39 @@ public async Task HandleMessageAsync(
{
case ServalPlatformOutboxConstants.BuildStarted:
await _client.BuildStartedAsync(
JsonSerializer.Deserialize<BuildStartedRequest>(content!),
JsonSerializer.Deserialize<BuildStartedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildCompleted:
await _client.BuildCompletedAsync(
JsonSerializer.Deserialize<BuildCompletedRequest>(content!),
JsonSerializer.Deserialize<BuildCompletedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildCanceled:
await _client.BuildCanceledAsync(
JsonSerializer.Deserialize<BuildCanceledRequest>(content!),
JsonSerializer.Deserialize<BuildCanceledRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildFaulted:
await _client.BuildFaultedAsync(
JsonSerializer.Deserialize<BuildFaultedRequest>(content!),
JsonSerializer.Deserialize<BuildFaultedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildRestarting:
await _client.BuildRestartingAsync(
JsonSerializer.Deserialize<BuildRestartingRequest>(content!),
JsonSerializer.Deserialize<BuildRestartingRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.InsertPretranslations:
IAsyncEnumerable<Pretranslation> pretranslations = JsonSerializer
.DeserializeAsyncEnumerable<Pretranslation>(
contentStream!,
JsonSerializerOptions,
_jsonSerializerOptions,
cancellationToken
)
.OfType<Pretranslation>();
Expand All @@ -66,7 +66,7 @@ await _client.BuildRestartingAsync(
await call.RequestStream.WriteAsync(
new InsertPretranslationsRequest
{
EngineId = content!,
EngineId = groupId,
CorpusId = pretranslation.CorpusId,
TextId = pretranslation.TextId,
Refs = { pretranslation.Refs },
Expand All @@ -81,13 +81,16 @@ await call.RequestStream.WriteAsync(
break;
case ServalPlatformOutboxConstants.IncrementTranslationEngineCorpusSize:
await _client.IncrementTranslationEngineCorpusSizeAsync(
JsonSerializer.Deserialize<IncrementTranslationEngineCorpusSizeRequest>(content!),
JsonSerializer.Deserialize<IncrementTranslationEngineCorpusSizeRequest>(
content!,
_jsonSerializerOptions
),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.UpdateBuildExecutionData:
await _client.UpdateBuildExecutionDataAsync(
JsonSerializer.Deserialize<UpdateBuildExecutionDataRequest>(content!),
JsonSerializer.Deserialize<UpdateBuildExecutionDataRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildStarted,
buildId,
JsonSerializer.Serialize(new BuildStartedRequest { BuildId = buildId }),
new BuildStartedRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand All @@ -32,14 +32,12 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildCompleted,
buildId,
JsonSerializer.Serialize(
new BuildCompletedRequest
{
BuildId = buildId,
CorpusSize = trainSize,
Confidence = confidence
}
),
new BuildCompletedRequest
{
BuildId = buildId,
CorpusSize = trainSize,
Confidence = confidence
},
cancellationToken: cancellationToken
);
}
Expand All @@ -50,7 +48,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildCanceled,
buildId,
JsonSerializer.Serialize(new BuildCanceledRequest { BuildId = buildId }),
new BuildCanceledRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand All @@ -61,7 +59,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildFaulted,
buildId,
JsonSerializer.Serialize(new BuildFaultedRequest { BuildId = buildId, Message = message }),
new BuildFaultedRequest { BuildId = buildId, Message = message },
cancellationToken: cancellationToken
);
}
Expand All @@ -72,7 +70,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildRestarting,
buildId,
JsonSerializer.Serialize(new BuildRestartingRequest { BuildId = buildId }),
new BuildRestartingRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand Down Expand Up @@ -111,13 +109,12 @@ public async Task InsertPretranslationsAsync(
CancellationToken cancellationToken = default
)
{
await _outboxService.EnqueueMessageAsync(
await _outboxService.EnqueueMessageStreamAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.InsertPretranslations,
engineId,
engineId,
pretranslationsStream,
cancellationToken: cancellationToken
cancellationToken
);
}

Expand All @@ -131,9 +128,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.IncrementTranslationEngineCorpusSize,
engineId,
JsonSerializer.Serialize(
new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count }
),
new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count },
cancellationToken: cancellationToken
);
}
Expand All @@ -151,7 +146,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.UpdateBuildExecutionData,
engineId,
JsonSerializer.Serialize(request),
request,
cancellationToken: cancellationToken
);
}
Expand Down
Loading

0 comments on commit b1063c6

Please sign in to comment.