From 95fff69833fd058303754a184aa2f4d00258ca61 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Fri, 19 Apr 2024 11:11:39 +0200 Subject: [PATCH] grpc: AsyncInvoke implementation --- js/modules/k6/grpc/client.go | 31 +++++++++ js/modules/k6/grpc/client_test.go | 102 +++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/js/modules/k6/grpc/client.go b/js/modules/k6/grpc/client.go index 796e3e6130c..8dd31804c31 100644 --- a/js/modules/k6/grpc/client.go +++ b/js/modules/k6/grpc/client.go @@ -287,6 +287,37 @@ func (c *Client) Invoke( return c.conn.Invoke(c.vu.Context(), grpcReq) } +// AsyncInvoke creates and calls a unary RPC by fully qualified method name asynchronously +func (c *Client) AsyncInvoke( + method string, + req goja.Value, + params goja.Value, +) *goja.Promise { + grpcReq, err := c.buildInvokeRequest(method, req, params) + + promise, resolve, reject := c.vu.Runtime().NewPromise() + if err != nil { + reject(err) + return promise + } + + callback := c.vu.RegisterCallback() + go func() { + res, err := c.conn.Invoke(c.vu.Context(), grpcReq) + + callback(func() error { + if err != nil { + reject(err) + return nil //nolint:nilerr // we don't want to return the error + } + resolve(res) + return nil + }) + }() + + return promise +} + // buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters func (c *Client) buildInvokeRequest( method string, diff --git a/js/modules/k6/grpc/client_test.go b/js/modules/k6/grpc/client_test.go index 0f52caced11..663689a249d 100644 --- a/js/modules/k6/grpc/client_test.go +++ b/js/modules/k6/grpc/client_test.go @@ -223,6 +223,22 @@ func TestClient(t *testing.T) { err: `unknown param: "void"`, }, }, + { + name: "AsyncInvokeInvalidParam", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { void: true }).then(function(resp) { + throw new Error("should not be here") + }, (err) => { + throw new Error(err) + })`, + err: `unknown param: "void"`, + }, + }, { name: "InvokeNilRequest", initString: codeBlock{code: ` @@ -317,6 +333,33 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "AsyncInvoke", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) { + return &grpc_testing.Empty{}, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}).then(function(resp) { + if (resp.status !== grpc.StatusOK) { + throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status) + } + }, (err) => { + throw new Error("unexpected error: " + err) + }) + `, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall")) + }, + }, + }, { name: "InvokeAnyProto", initString: codeBlock{code: ` @@ -387,6 +430,32 @@ func TestClient(t *testing.T) { throw new Error("server did not receive the correct request message") }`}, }, + { + name: "AsyncRequestMessage", + initString: codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`, + }, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.UnaryCallFunc = func(_ context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + if req.Payload == nil || string(req.Payload.Body) != "负载测试" { + return nil, status.Error(codes.InvalidArgument, "") + } + return &grpc_testing.SimpleResponse{}, nil + } + }, + vuString: codeBlock{code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/UnaryCall", { payload: { body: "6LSf6L295rWL6K+V"} }).then(function(resp) { + if (resp.status !== grpc.StatusOK) { + throw new Error("server did not receive the correct request message") + } + }, (err) => { + throw new Error("unexpected error: " + err) + }); + `}, + }, { name: "RequestHeaders", initString: codeBlock{ @@ -464,6 +533,37 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "AsyncResponseMessage", + initString: codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`, + }, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.UnaryCallFunc = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{ + OauthScope: "水", + }, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/UnaryCall", {}).then(function(resp) { + if (!resp.message || resp.message.username !== "" || resp.message.oauthScope !== "水") { + throw new Error("unexpected response message: " + JSON.stringify(resp.message)) + } + }, (err) => { + throw new Error("unexpected error: " + err) + }); + `, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/UnaryCall")) + }, + }, + }, { name: "ResponseError", initString: codeBlock{ @@ -973,7 +1073,7 @@ func TestClient(t *testing.T) { assertResponse(t, tt.initString, err, val, ts) ts.ToVUContext() - val, err = ts.Run(tt.vuString.code) + val, err = ts.RunOnEventLoop(tt.vuString.code) assertResponse(t, tt.vuString, err, val, ts) }) }