Skip to content

Commit

Permalink
Add a second argument to callbacks for grpc streams with metadata
Browse files Browse the repository at this point in the history
Change updates the grpc stream callbacks to all receivers to be passed
a meta data object that includes the timestamp of the original event.
This change facilitates being able to accurately check the time when a
message was recevied by a client stream.
  • Loading branch information
cchamplin committed Jun 19, 2024
1 parent 41db838 commit a474555
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 10 deletions.
10 changes: 5 additions & 5 deletions js/modules/k6/grpc/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type eventListener struct {

// this return sobek.value *and* error in order to return error on exception instead of panic
// https://pkg.go.dev/github.com/dop251/goja#hdr-Functions
list []func(sobek.Value) (sobek.Value, error)
list []func(sobek.Value, sobek.Value) (sobek.Value, error)
}

// newListener creates a new listener of a certain type
Expand All @@ -38,7 +38,7 @@ func newListener(eventType string) *eventListener {
}

// add adds a listener to the listener list
func (l *eventListener) add(fn func(sobek.Value) (sobek.Value, error)) {
func (l *eventListener) add(fn func(sobek.Value, sobek.Value) (sobek.Value, error)) {
l.list = append(l.list, fn)
}

Expand All @@ -59,7 +59,7 @@ func (l *eventListeners) getType(t string) *eventListener {
}

// add adds a listener to the listeners
func (l *eventListeners) add(t string, f func(sobek.Value) (sobek.Value, error)) error {
func (l *eventListeners) add(t string, f func(sobek.Value, sobek.Value) (sobek.Value, error)) error {
list := l.getType(t)

if list == nil {
Expand All @@ -72,11 +72,11 @@ func (l *eventListeners) add(t string, f func(sobek.Value) (sobek.Value, error))
}

// all returns all possible listeners for a certain event type or an empty array
func (l *eventListeners) all(t string) []func(sobek.Value) (sobek.Value, error) {
func (l *eventListeners) all(t string) []func(sobek.Value, sobek.Value) (sobek.Value, error) {
list := l.getType(t)

if list == nil {
return []func(sobek.Value) (sobek.Value, error){}
return []func(sobek.Value, sobek.Value) (sobek.Value, error){}
}

return list.list
Expand Down
34 changes: 29 additions & 5 deletions js/modules/k6/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
closed
)

const (
timestampMetadata = "ts"
)

type stream struct {
vu modules.VU
client *Client
Expand Down Expand Up @@ -143,12 +147,13 @@ func (s *stream) loop() {
}

func (s *stream) queueMessage(msg interface{}) {
now := time.Now()
metrics.PushIfNotDone(s.vu.Context(), s.vu.State().Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: s.instanceMetrics.StreamsMessagesReceived,
Tags: s.tagsAndMeta.Tags,
},
Time: time.Now(),
Time: now,
Metadata: s.tagsAndMeta.Metadata,
Value: 1,
})
Expand All @@ -157,8 +162,14 @@ func (s *stream) queueMessage(msg interface{}) {
rt := s.vu.Runtime()
listeners := s.eventListeners.all(eventData)

metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}

for _, messageListener := range listeners {
if _, err := messageListener(rt.ToValue(msg)); err != nil {
if _, err := messageListener(rt.ToValue(msg), metadataObj); err != nil {
// TODO(olegbespalov) consider logging the error
_ = s.closeWithError(err)

Expand Down Expand Up @@ -294,7 +305,7 @@ func (s *stream) processSendError(err error) {
}

// on registers a handler for a certain event type
func (s *stream) on(event string, handler func(sobek.Value) (sobek.Value, error)) {
func (s *stream) on(event string, handler func(sobek.Value, sobek.Value) (sobek.Value, error)) {
if handler == nil {
common.Throw(s.vu.Runtime(), fmt.Errorf("handler for %q event isn't a callable function", event))
}
Expand Down Expand Up @@ -384,8 +395,15 @@ func (s *stream) callErrorListeners(e error) error {
s.logger.Warnf("no handlers for error registered, but an error happened: %s", e)
}

now := time.Now()
metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}

for _, errorListener := range list {
if _, err := errorListener(rt.ToValue(obj)); err != nil {
if _, err := errorListener(rt.ToValue(obj), metadataObj); err != nil {
return err
}
}
Expand Down Expand Up @@ -426,10 +444,16 @@ func extractError(e error) grpcError {
}

func (s *stream) callEventListeners(eventType string) error {
now := time.Now()
rt := s.vu.Runtime()

metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}
for _, listener := range s.eventListeners.all(eventType) {
if _, err := listener(rt.ToValue(struct{}{})); err != nil {
if _, err := listener(rt.ToValue(struct{}{}), metadataObj); err != nil {
return err
}
}
Expand Down
87 changes: 87 additions & 0 deletions js/modules/k6/grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -271,6 +272,92 @@ func TestStream_ReceiveAllServerResponsesAfterEnd(t *testing.T) {
)
}

func TestStream_ReceiveMetadata(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := &featureExplorerStub{}

savedFeatures := []*grpcservice.Feature{
{
Name: "foo",
Location: &grpcservice.Point{
Latitude: 1,
Longitude: 2,
},
},
{
Name: "bar",
Location: &grpcservice.Point{
Latitude: 3,
Longitude: 4,
},
},
}

stub.listFeatures = func(_ *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error {
for _, feature := range savedFeatures {
// adding a delay to make server response "slower"
time.Sleep(200 * time.Millisecond)

if err := stream.Send(feature); err != nil {
return err
}
}

return nil
}

grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub)

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/grpcservice/route_guide.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures")
stream.on('data', function (data, meta) {
call(meta.ts);
});
stream.on('end', function (_, meta) {
call(meta.ts);
});
stream.write({
lo: {
latitude: 1,
longitude: 2,
},
hi: {
latitude: 1,
longitude: 2,
},
});
stream.end();
`,
}

val, err := ts.Run(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = ts.RunOnEventLoop(vuString.code)

assertResponse(t, vuString, err, val, ts)

for _, call := range ts.callRecorder.Recorded() {
seconds, err := strconv.ParseInt(call, 10, 64)
assert.NoError(t, err)
metaTS := time.Unix(seconds, 0)
assert.WithinDuration(t, time.Now(), metaTS, 1*time.Minute)
}
}

// featureExplorerStub is a stub for FeatureExplorerServer
// it has ability to override methods
type featureExplorerStub struct {
Expand Down

0 comments on commit a474555

Please sign in to comment.