diff --git a/js/modules/k6/grpc/stream.go b/js/modules/k6/grpc/stream.go index 8b60a2ca216..6fa48162d85 100644 --- a/js/modules/k6/grpc/stream.go +++ b/js/modules/k6/grpc/stream.go @@ -75,11 +75,10 @@ func defineStream(rt *sobek.Runtime, s *stream) { } func (s *stream) beginStream(p *callParams) error { - tags := s.vu.State().Tags.GetCurrentValues() req := &grpcext.StreamRequest{ Method: s.method, MethodDescriptor: s.methodDescriptor, - TagsAndMeta: &tags, + TagsAndMeta: &p.TagsAndMeta, Metadata: p.Metadata, } diff --git a/js/modules/k6/grpc/stream_test.go b/js/modules/k6/grpc/stream_test.go index 67597ccdb33..25b8876f899 100644 --- a/js/modules/k6/grpc/stream_test.go +++ b/js/modules/k6/grpc/stream_test.go @@ -10,6 +10,7 @@ import ( "go.k6.io/k6/lib/testutils/grpcservice" "go.k6.io/k6/lib/testutils/httpmultibin/grpc_wrappers_testing" + "go.k6.io/k6/metrics" "github.com/golang/protobuf/ptypes/wrappers" "github.com/grafana/sobek" @@ -406,3 +407,108 @@ func TestStream_UndefinedHandler(t *testing.T) { require.ErrorContains(t, err, "handler for \"data\" event isn't a callable function") } + +// TestStream_MetricsTagsMetadata tests that the metrics tags are correctly +// added to samples. +func TestStream_MetricsTagsMetadata(t *testing.T) { + t.Parallel() + + ts := newTestState(t) + + stub := &featureExplorerStub{} + + savedFeatures := []*grpcservice.Feature{ + { + Name: "foo", + Location: &grpcservice.Point{ + Latitude: 1, + Longitude: 2, + }, + }, + { + Name: "bar", + Location: &grpcservice.Point{ + Latitude: 3, + Longitude: 4, + }, + }, + } + + stub.listFeatures = func(_ *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error { + for _, feature := range savedFeatures { + // adding a delay to make server response "slower" + time.Sleep(200 * time.Millisecond) + + if err := stream.Send(feature); err != nil { + return err + } + } + + return nil + } + + grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub) + + initString := codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/grpcservice/route_guide.proto");`, + } + vuString := codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + + let params = { + tags: { "tag1": "value1" }, + }; + + let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures", params) + stream.on('data', function (data) { + call('Feature:' + data.name); + }); + stream.on('end', function () { + call('End called'); + }); + + stream.write({ + lo: { + latitude: 1, + longitude: 2, + }, + hi: { + latitude: 1, + longitude: 2, + }, + }); + stream.end(); + `, + } + + val, err := ts.Run(initString.code) + assertResponse(t, initString, err, val, ts) + + ts.ToVUContext() + + val, err = ts.RunOnEventLoop(vuString.code) + + assertResponse(t, vuString, err, val, ts) + + expTags := map[string]string{"tag1": "value1"} + + samplesBuf := metrics.GetBufferedSamples(ts.samples) + + assert.Len(t, samplesBuf, 5) + for _, samples := range samplesBuf { + for _, sample := range samples.GetSamples() { + assertTags(t, sample, expTags) + } + } +} + +func assertTags(t *testing.T, sample metrics.Sample, tags map[string]string) { + for k, v := range tags { + tag, ok := sample.Tags.Get(k) + assert.True(t, ok) + assert.Equal(t, tag, v) + } +}