Skip to content

Commit

Permalink
Message handler fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Enkidu93 committed Feb 26, 2025
1 parent ac0aae5 commit cda698f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public static class ServalWordAlignmentPlatformOutboxConstants
public const string BuildFaulted = "BuildFaulted";
public const string BuildRestarting = "BuildRestarting";
public const string IncrementTrainEngineCorpusSize = "IncrementTrainEngineCorpusSize";
public const string InsertInferenceResults = "InsertInferenceResults";
public const string InsertWordAlignmentResults = "InsertWordAlignmentResults";
public const string UpdateBuildExecutionData = "UpdateBuildExecutionData";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,45 @@ public async Task HandleMessageAsync(
switch (method)
{
case ServalWordAlignmentPlatformOutboxConstants.BuildStarted:
ArgumentNullException.ThrowIfNull(content);
await _client.BuildStartedAsync(
JsonSerializer.Deserialize<BuildStartedRequest>(content!),
JsonSerializer.Deserialize<BuildStartedRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalWordAlignmentPlatformOutboxConstants.BuildCompleted:
ArgumentNullException.ThrowIfNull(content);
await _client.BuildCompletedAsync(
JsonSerializer.Deserialize<BuildCompletedRequest>(content!),
JsonSerializer.Deserialize<BuildCompletedRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalWordAlignmentPlatformOutboxConstants.BuildCanceled:
ArgumentNullException.ThrowIfNull(content);
await _client.BuildCanceledAsync(
JsonSerializer.Deserialize<BuildCanceledRequest>(content!),
JsonSerializer.Deserialize<BuildCanceledRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalWordAlignmentPlatformOutboxConstants.BuildFaulted:
ArgumentNullException.ThrowIfNull(content);
await _client.BuildFaultedAsync(
JsonSerializer.Deserialize<BuildFaultedRequest>(content!),
JsonSerializer.Deserialize<BuildFaultedRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalWordAlignmentPlatformOutboxConstants.BuildRestarting:
ArgumentNullException.ThrowIfNull(content);
await _client.BuildRestartingAsync(
JsonSerializer.Deserialize<BuildRestartingRequest>(content!),
JsonSerializer.Deserialize<BuildRestartingRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalWordAlignmentPlatformOutboxConstants.InsertInferenceResults:
case ServalWordAlignmentPlatformOutboxConstants.InsertWordAlignmentResults:
ArgumentNullException.ThrowIfNull(contentStream);
IAsyncEnumerable<Models.WordAlignment> wordAlignments = JsonSerializer
.DeserializeAsyncEnumerable<Models.WordAlignment>(
contentStream!,
contentStream,
JsonSerializerOptions,
cancellationToken
)
Expand Down Expand Up @@ -84,8 +90,9 @@ await call.RequestStream.WriteAsync(
}
break;
case ServalWordAlignmentPlatformOutboxConstants.IncrementTrainEngineCorpusSize:
ArgumentNullException.ThrowIfNull(content);
await _client.IncrementEngineCorpusSizeAsync(
JsonSerializer.Deserialize<IncrementEngineCorpusSizeRequest>(content!),
JsonSerializer.Deserialize<IncrementEngineCorpusSizeRequest>(content, JsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
Expand All @@ -103,7 +110,8 @@ await _client.IncrementEngineCorpusSizeAsync(
yield return new WordAlignment.V1.AlignedWordPair
{
SourceIndex = alignedWordPair.SourceIndex,
TargetIndex = alignedWordPair.TargetIndex
TargetIndex = alignedWordPair.TargetIndex,
Score = alignedWordPair.TranslationScore
};
}
}
Expand Down Expand Up @@ -135,23 +143,23 @@ JsonSerializerOptions options
string s = reader.GetString()!;
switch (s)
{
case "corpus_id":
case "corpusId":
reader.Read();
corpusId = reader.GetString()!;
break;
case "text_id":
case "textId":
reader.Read();
textId = reader.GetString()!;
break;
case "refs":
reader.Read();
refs = JsonSerializer.Deserialize<IList<string>>(ref reader, options)!.ToArray();
break;
case "source_tokens":
case "sourceTokens":
reader.Read();
sourceTokens = JsonSerializer.Deserialize<IList<string>>(ref reader, options)!.ToArray();
break;
case "target_tokens":
case "targetTokens":
reader.Read();
targetTokens = JsonSerializer.Deserialize<IList<string>>(ref reader, options)!.ToArray();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public async Task InsertInferenceResultsAsync(
{
await _outboxService.EnqueueMessageStreamAsync(
ServalWordAlignmentPlatformOutboxConstants.OutboxId,
ServalWordAlignmentPlatformOutboxConstants.InsertInferenceResults,
ServalWordAlignmentPlatformOutboxConstants.InsertWordAlignmentResults,
engineId,
wordAlignmentsStream,
cancellationToken: cancellationToken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace Serval.Machine.Shared.Services;

[TestFixture]
public class ServalPlatformOutboxMessageHandlerTests
public class ServalTranslationPlatformOutboxMessageHandlerTests
{
[Test]
public async Task HandleMessageAsync_BuildStarted()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using Google.Protobuf.WellKnownTypes;
using Serval.WordAlignment.V1;

namespace Serval.Machine.Shared.Services;

[TestFixture]
public class ServalWordAlignmentPlatformOutboxMessageHandlerTests
{
[Test]
public async Task HandleMessageAsync_BuildStarted()
{
TestEnvironment env = new();

await env.Handler.HandleMessageAsync(
"groupId",
ServalWordAlignmentPlatformOutboxConstants.BuildStarted,
JsonSerializer.Serialize(
new BuildStartedRequest { BuildId = "C" },
MessageOutboxOptions.JsonSerializerOptions
),
null
);

_ = env.Client.Received(1).BuildStartedAsync(Arg.Is<BuildStartedRequest>(x => x.BuildId == "C"));
}

[Test]
public async Task HandleMessageAsync_InsertInferenceResults()
{
TestEnvironment env = new();

await using (MemoryStream stream = new())
{
await JsonSerializer.SerializeAsync(
stream,
new[]
{
new JsonObject
{
{ "corpusId", "corpus1" },
{ "textId", "MAT" },
{ "refs", new JsonArray(["MAT 1:1"]) },
{ "sourceTokens", new JsonArray(["sourceToken1"]) },
{ "targetTokens", new JsonArray(["targetToken1"]) },
{ "alignment", "0-0:1.0:1.0" }
}
},
MessageOutboxOptions.JsonSerializerOptions
);
stream.Seek(0, SeekOrigin.Begin);
await env.Handler.HandleMessageAsync(
"engine1",
ServalWordAlignmentPlatformOutboxConstants.InsertWordAlignmentResults,
"engine1",
stream
);
}

_ = env.Client.Received(1).InsertWordAlignments();
_ = env.WordAlignmentsWriter.Received(1)
.WriteAsync(
new InsertWordAlignmentsRequest
{
EngineId = "engine1",
CorpusId = "corpus1",
TextId = "MAT",
Refs = { "MAT 1:1" },
SourceTokens = { "sourceToken1" },
TargetTokens = { "targetToken1" },
Alignment =
{
new WordAlignment.V1.AlignedWordPair()
{
SourceIndex = 0,
TargetIndex = 0,
Score = 1.0
}
}
},
Arg.Any<CancellationToken>()
);
}

private class TestEnvironment
{
public TestEnvironment()
{
Client = Substitute.For<WordAlignmentPlatformApi.WordAlignmentPlatformApiClient>();
Client.BuildStartedAsync(Arg.Any<BuildStartedRequest>()).Returns(CreateEmptyUnaryCall());
Client.BuildCanceledAsync(Arg.Any<BuildCanceledRequest>()).Returns(CreateEmptyUnaryCall());
Client.BuildFaultedAsync(Arg.Any<BuildFaultedRequest>()).Returns(CreateEmptyUnaryCall());
Client.BuildCompletedAsync(Arg.Any<BuildCompletedRequest>()).Returns(CreateEmptyUnaryCall());
Client
.IncrementEngineCorpusSizeAsync(Arg.Any<IncrementEngineCorpusSizeRequest>())
.Returns(CreateEmptyUnaryCall());
WordAlignmentsWriter = Substitute.For<IClientStreamWriter<InsertWordAlignmentsRequest>>();
Client
.InsertWordAlignments(cancellationToken: Arg.Any<CancellationToken>())
.Returns(
TestCalls.AsyncClientStreamingCall(
WordAlignmentsWriter,
Task.FromResult(new Empty()),
Task.FromResult(new Metadata()),
() => Status.DefaultSuccess,
() => new Metadata(),
() => { }
)
);

Handler = new ServalWordAlignmentPlatformOutboxMessageHandler(Client);
}

public WordAlignmentPlatformApi.WordAlignmentPlatformApiClient Client { get; }
public ServalWordAlignmentPlatformOutboxMessageHandler Handler { get; }
public IClientStreamWriter<InsertWordAlignmentsRequest> WordAlignmentsWriter { get; }

private static AsyncUnaryCall<Empty> CreateEmptyUnaryCall()
{
return new AsyncUnaryCall<Empty>(
Task.FromResult(new Empty()),
Task.FromResult(new Metadata()),
() => Status.DefaultSuccess,
() => new Metadata(),
() => { }
);
}
}
}

0 comments on commit cda698f

Please sign in to comment.