From 25934e280ad0845fa8c12d99631f4acf8d62340c Mon Sep 17 00:00:00 2001 From: Catcher Wong Date: Sat, 19 Feb 2022 11:56:39 +0800 Subject: [PATCH] feat: grpc call timeout/deadline (#2) --- src/Dtmgrpc/DtmgRPCClient.cs | 23 +++++++++++++------ .../GrpcCallInvokerTests.cs | 16 ++++++++++--- .../Dtmgrpc.IntegrationTests/ITTestHelper.cs | 3 ++- 3 files changed, 31 insertions(+), 11 deletions(-) rename tests/{Dtmgrpc.Tests => Dtmgrpc.IntegrationTests}/GrpcCallInvokerTests.cs (64%) diff --git a/src/Dtmgrpc/DtmgRPCClient.cs b/src/Dtmgrpc/DtmgRPCClient.cs index b2b06f8..16abed6 100644 --- a/src/Dtmgrpc/DtmgRPCClient.cs +++ b/src/Dtmgrpc/DtmgRPCClient.cs @@ -4,6 +4,7 @@ using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Grpc.Net.Client; +using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -19,10 +20,12 @@ public class DtmgRPCClient : IDtmgRPCClient private static readonly string HTTPPrefix = "http://"; private readonly Driver.IDtmDriver _dtmDriver; + private readonly DtmOptions _options; - public DtmgRPCClient(Driver.IDtmDriver dtmDriver) + public DtmgRPCClient(Driver.IDtmDriver dtmDriver, IOptions optionsAccs) { this._dtmDriver = dtmDriver; + this._options = optionsAccs.Value; } public async Task DtmGrpcCall(TransBase transBase, string operation) @@ -31,14 +34,18 @@ public async Task DtmGrpcCall(TransBase transBase, string operation) var method = new Method(MethodType.Unary, DtmServiceName, operation, DtmRequestMarshaller, DtmReplyMarshaller); using var channel = GrpcChannel.ForAddress(transBase.Dtm); - await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, new CallOptions(), dtmRequest); + var callOptions = new CallOptions() + .WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout)); + await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, callOptions, dtmRequest); } public async Task GenGid(string grpcServer) { using var channel = GrpcChannel.ForAddress(grpcServer); var client = new dtmgpb.Dtm.DtmClient(channel); - var reply = await client.NewGidAsync(new Empty(), new CallOptions()); + var callOptions = new CallOptions() + .WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout)); + var reply = await client.NewGidAsync(new Empty(), callOptions); return reply.Gid; } @@ -59,9 +66,9 @@ public async Task InvokeBranch(TransBase tb, TRe var grpcMethod = Utils.CreateMethod(MethodType.Unary, serviceName, method); var metadata = Utils.TransInfo2Metadata(tb.Gid, tb.TransType, branchId, op, tb.Dtm); - var callOptions = new CallOptions(); - callOptions.WithHeaders(metadata); - + var callOptions = new CallOptions() + .WithHeaders(metadata) + .WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.BranchTimeout)); var resp = await channel.CreateCallInvoker().AsyncUnaryCall(grpcMethod, string.Empty, callOptions, msg); return resp; } @@ -80,7 +87,9 @@ public async Task RegisterBranch(TransBase tb, string branchId, ByteString bd, D using var channel = GrpcChannel.ForAddress(tb.Dtm); var client = new dtmgpb.Dtm.DtmClient(channel); - await client.RegisterBranchAsync(request, new CallOptions()); + var callOptions = new CallOptions() + .WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout)); + await client.RegisterBranchAsync(request, callOptions); } public TransBase TransBaseFromGrpc(ServerCallContext context) diff --git a/tests/Dtmgrpc.Tests/GrpcCallInvokerTests.cs b/tests/Dtmgrpc.IntegrationTests/GrpcCallInvokerTests.cs similarity index 64% rename from tests/Dtmgrpc.Tests/GrpcCallInvokerTests.cs rename to tests/Dtmgrpc.IntegrationTests/GrpcCallInvokerTests.cs index 0f83509..1264a3f 100644 --- a/tests/Dtmgrpc.Tests/GrpcCallInvokerTests.cs +++ b/tests/Dtmgrpc.IntegrationTests/GrpcCallInvokerTests.cs @@ -2,28 +2,38 @@ using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Grpc.Net.Client; +using Microsoft.Extensions.DependencyInjection; using System; using System.Threading.Tasks; using Xunit; -namespace Dtmgrpc.Tests +namespace Dtmgrpc.IntegrationTests { public class GrpcCallInvokerTests { private static readonly Marshaller DtmGidReplyMarshaller = Marshallers.Create(r => r.ToByteArray(), data => dtmgpb.DtmGidReply.Parser.ParseFrom(data)); private static readonly Marshaller EmptyMarshaller = Marshallers.Create(r => r.ToByteArray(), data => Empty.Parser.ParseFrom(data)); - [Fact(Skip = "local")] + [Fact] public async Task CallInvokerAsync() { AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); var method = new Method(MethodType.Unary, "dtmgimp.Dtm", "NewGid", EmptyMarshaller, DtmGidReplyMarshaller); - using var channel = GrpcChannel.ForAddress("http://localhost:36790"); + using var channel = GrpcChannel.ForAddress(ITTestHelper.DTMgRPCUrl); var resp = await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, new CallOptions(), new Empty()); Assert.NotNull(resp.Gid); } + + [Fact] + public async Task DtmTimoutTest() + { + var provider = ITTestHelper.AddDtmGrpc(1); + var client = provider.GetRequiredService(); + var ex = await Assert.ThrowsAsync(async () => await client.GenGid(ITTestHelper.DTMgRPCUrl)); + Assert.Equal(StatusCode.DeadlineExceeded, ex.StatusCode); + } } } diff --git a/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs b/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs index e42e4a4..2630ee1 100644 --- a/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs +++ b/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs @@ -57,7 +57,7 @@ public static busi.BusiReq GenBusiReq(bool outFailed, bool inFailed) }; } - public static ServiceProvider AddDtmGrpc() + public static ServiceProvider AddDtmGrpc(int dtmTimout = 10000) { AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); var services = new ServiceCollection(); @@ -65,6 +65,7 @@ public static ServiceProvider AddDtmGrpc() services.AddDtmGrpc(x => { x.DtmGrpcUrl = DTMgRPCUrl; + x.DtmTimeout = dtmTimout; }); var provider = services.BuildServiceProvider();