From 541c814d70122d0829ec27d65f4078298be107a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Tue, 14 May 2024 13:37:03 +0200 Subject: [PATCH 1/5] Support to init a ReadableStream from a io.Reader --- js/modules/k6/experimental/streams/module.go | 81 ++++++++++++++++---- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/js/modules/k6/experimental/streams/module.go b/js/modules/k6/experimental/streams/module.go index 51018a5ea79..c91d874d6c0 100644 --- a/js/modules/k6/experimental/streams/module.go +++ b/js/modules/k6/experimental/streams/module.go @@ -2,6 +2,9 @@ package streams import ( + "errors" + "io" + "github.com/dop251/goja" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" @@ -46,7 +49,12 @@ func (mi *ModuleInstance) Exports() modules.Exports { // NewReadableStream is the constructor for the ReadableStream object. func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Object { - rt := mi.vu.Runtime() + return newReadableStream(mi.vu, call) +} + +func newReadableStream(vu modules.VU, call goja.ConstructorCall) *goja.Object { + rt := vu.Runtime() + var err error // 1. If underlyingSource is missing, set it to null. @@ -60,7 +68,7 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj // We look for the queuing strategy first, and validate it before // the underlying source, in order to pass the Web Platform Tests // constructor tests. - strategy = mi.initializeStrategy(call) + strategy = initializeStrategy(rt, call) // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource. if len(call.Arguments) > 0 && !goja.IsUndefined(call.Arguments[0]) { @@ -79,8 +87,8 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj // 3. Perform ! InitializeReadableStream(this). stream := &ReadableStream{ - runtime: mi.vu.Runtime(), - vu: mi.vu, + runtime: rt, + vu: vu, } stream.initialize() @@ -129,27 +137,25 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj } func defaultSizeFunc(_ goja.Value) (float64, error) { return 1.0, nil } -func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Object { - runtime := mi.vu.Runtime() - +func initializeStrategy(rt *goja.Runtime, call goja.ConstructorCall) *goja.Object { // Either if the strategy is not provided or if it doesn't have a 'highWaterMark', // we need to set its default value (highWaterMark=1). // https://streams.spec.whatwg.org/#rs-prototype - strArg := runtime.NewObject() + strArg := rt.NewObject() if len(call.Arguments) > 1 && !common.IsNullish(call.Arguments[1]) { - strArg = call.Arguments[1].ToObject(runtime) + strArg = call.Arguments[1].ToObject(rt) } if common.IsNullish(strArg.Get("highWaterMark")) { - if err := strArg.Set("highWaterMark", runtime.ToValue(1)); err != nil { - common.Throw(runtime, newError(RuntimeError, err.Error())) + if err := strArg.Set("highWaterMark", rt.ToValue(1)); err != nil { + common.Throw(rt, newError(RuntimeError, err.Error())) } } // If the stream type is 'bytes', we don't want the size function. // Except, when it is manually specified. - size := runtime.ToValue(defaultSizeFunc) + size := rt.ToValue(defaultSizeFunc) if len(call.Arguments) > 0 && !common.IsNullish(call.Arguments[0]) { - srcArg := call.Arguments[0].ToObject(runtime) + srcArg := call.Arguments[0].ToObject(rt) srcTypeArg := srcArg.Get("type") if !common.IsNullish(srcTypeArg) && srcTypeArg.String() == ReadableStreamTypeBytes { size = nil @@ -160,7 +166,7 @@ func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Ob } strCall := goja.ConstructorCall{Arguments: []goja.Value{strArg}} - return mi.newCountQueuingStrategy(runtime, strCall, size) + return newCountQueuingStrategy(rt, strCall, size) } // NewCountQueuingStrategy is the constructor for the [CountQueuingStrategy] object. @@ -170,14 +176,14 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call goja.ConstructorCall) *go rt := mi.vu.Runtime() // By default, the CountQueuingStrategy has a pre-defined 'size' property. // It cannot be overwritten by the user. - return mi.newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) + return newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) } // newCountQueuingStrategy is the underlying constructor for the [CountQueuingStrategy] object. // // It allows to create a CountQueuingStrategy with or without the 'size' property, // depending on how the containing ReadableStream is initialized. -func (mi *ModuleInstance) newCountQueuingStrategy( +func newCountQueuingStrategy( rt *goja.Runtime, call goja.ConstructorCall, size goja.Value, @@ -285,3 +291,46 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call goja.ConstructorCa return object } + +func NewReadableStreamForReader(vu modules.VU, reader io.Reader) *goja.Object { + rt := vu.Runtime() + return newReadableStream(vu, goja.ConstructorCall{ + Arguments: []goja.Value{rt.ToValue(underlyingSourceForReader(vu, reader))}, + This: rt.NewObject(), + }) +} + +func underlyingSourceForReader(vu modules.VU, reader io.Reader) *goja.Object { + rt := vu.Runtime() + + underlyingSource := vu.Runtime().NewObject() + if err := underlyingSource.Set("pull", rt.ToValue(func(controller *goja.Object) *goja.Promise { + // Prepare methods + cClose, _ := goja.AssertFunction(controller.Get("close")) + cEnqueue, _ := goja.AssertFunction(controller.Get("enqueue")) + + buf := make([]byte, 1024) + n, err := reader.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + panic(err) + } + + _, enqueueErr := cEnqueue(nil, rt.ToValue(string(buf[:n]))) + if enqueueErr != nil { + panic(enqueueErr) + } + + if err == io.EOF { + _, closeErr := cClose(nil) + if closeErr != nil { + panic(closeErr) + } + } + + return newResolvedPromise(vu, goja.Undefined()) + })); err != nil { + throw(rt, err) + } + + return underlyingSource +} From 9cbf4513b2aaba84a05aac3eeca8409f81ac417f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 30 May 2024 13:11:03 +0200 Subject: [PATCH 2/5] streams.NewReadableStreamForReader test --- .../k6/experimental/streams/module_test.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 js/modules/k6/experimental/streams/module_test.go diff --git a/js/modules/k6/experimental/streams/module_test.go b/js/modules/k6/experimental/streams/module_test.go new file mode 100644 index 00000000000..3c23c35e784 --- /dev/null +++ b/js/modules/k6/experimental/streams/module_test.go @@ -0,0 +1,39 @@ +package streams + +import ( + "bytes" + "testing" + + "github.com/dop251/goja" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js/modulestest" +) + +func TestNewReadableStreamForReader(t *testing.T) { + // The value to be streamed. + exp := "Hello, World!" + + // We initialize the runtime, with the ReadableStream(rs) accessible in JS. + r := modulestest.NewRuntime(t) + rs := NewReadableStreamForReader(r.VU, bytes.NewReader([]byte(exp))) + require.NoError(t, r.VU.Runtime().Set("rs", rs)) + + // Then, we run some JS code that reads from the ReadableStream(rs). + var ret goja.Value + err := r.EventLoop.Start(func() (err error) { + ret, err = r.VU.Runtime().RunString(`(async () => { + const reader = rs.getReader(); + const {value} = await reader.read(); + return value; +})()`) + return err + }) + assert.NoError(t, err) + + // Finally, we expect the returned promise to resolve + // to the expected value (the one we streamed). + p, ok := ret.Export().(*goja.Promise) + require.True(t, ok) + assert.Equal(t, exp, p.Result().String()) +} From b578b7eaaefe6356f62f4257b08cd7d4e43b290e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 30 May 2024 13:22:06 +0200 Subject: [PATCH 3/5] Fix linter complains --- js/modules/k6/experimental/streams/module.go | 14 ++++++++------ js/modules/k6/experimental/streams/module_test.go | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/js/modules/k6/experimental/streams/module.go b/js/modules/k6/experimental/streams/module.go index c91d874d6c0..672b6c117c3 100644 --- a/js/modules/k6/experimental/streams/module.go +++ b/js/modules/k6/experimental/streams/module.go @@ -53,14 +53,13 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj } func newReadableStream(vu modules.VU, call goja.ConstructorCall) *goja.Object { - rt := vu.Runtime() - - var err error + var ( + // 1. If underlyingSource is missing, set it to null. + underlyingSource *goja.Object - // 1. If underlyingSource is missing, set it to null. - var underlyingSource *goja.Object + rt = vu.Runtime() - var ( + err error strategy *goja.Object underlyingSourceDict UnderlyingSource ) @@ -292,6 +291,9 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call goja.ConstructorCa return object } +// NewReadableStreamForReader is the equivalent of [NewReadableStreamDefaultReader] but to initialize +// a new [ReadableStream] from a given [io.Reader] in Go code. +// It is useful for those situations when a [io.Reader] needs to be surfaced up to the JS runtime. func NewReadableStreamForReader(vu modules.VU, reader io.Reader) *goja.Object { rt := vu.Runtime() return newReadableStream(vu, goja.ConstructorCall{ diff --git a/js/modules/k6/experimental/streams/module_test.go b/js/modules/k6/experimental/streams/module_test.go index 3c23c35e784..4576c218c33 100644 --- a/js/modules/k6/experimental/streams/module_test.go +++ b/js/modules/k6/experimental/streams/module_test.go @@ -11,6 +11,8 @@ import ( ) func TestNewReadableStreamForReader(t *testing.T) { + t.Parallel() + // The value to be streamed. exp := "Hello, World!" From 28a6b50b43a98ac4bbdec1add3e6780460458e5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= <5459617+joanlopez@users.noreply.github.com> Date: Thu, 6 Jun 2024 22:43:38 +0200 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Ivan <2103732+codebien@users.noreply.github.com> --- js/modules/k6/experimental/streams/module.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/modules/k6/experimental/streams/module.go b/js/modules/k6/experimental/streams/module.go index 672b6c117c3..f10e7282ae3 100644 --- a/js/modules/k6/experimental/streams/module.go +++ b/js/modules/k6/experimental/streams/module.go @@ -294,7 +294,7 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call goja.ConstructorCa // NewReadableStreamForReader is the equivalent of [NewReadableStreamDefaultReader] but to initialize // a new [ReadableStream] from a given [io.Reader] in Go code. // It is useful for those situations when a [io.Reader] needs to be surfaced up to the JS runtime. -func NewReadableStreamForReader(vu modules.VU, reader io.Reader) *goja.Object { +func NewReadableStreamFromReader(vu modules.VU, reader io.Reader) *goja.Object { rt := vu.Runtime() return newReadableStream(vu, goja.ConstructorCall{ Arguments: []goja.Value{rt.ToValue(underlyingSourceForReader(vu, reader))}, From 3b681e657fbfa59481fd687c28976cf8c78de7de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 6 Jun 2024 23:01:51 +0200 Subject: [PATCH 5/5] Replace goja => sobek --- js/modules/k6/experimental/streams/module_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/js/modules/k6/experimental/streams/module_test.go b/js/modules/k6/experimental/streams/module_test.go index 4576c218c33..cfe859fd85f 100644 --- a/js/modules/k6/experimental/streams/module_test.go +++ b/js/modules/k6/experimental/streams/module_test.go @@ -4,7 +4,7 @@ import ( "bytes" "testing" - "github.com/dop251/goja" + "github.com/grafana/sobek" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.k6.io/k6/js/modulestest" @@ -18,11 +18,11 @@ func TestNewReadableStreamForReader(t *testing.T) { // We initialize the runtime, with the ReadableStream(rs) accessible in JS. r := modulestest.NewRuntime(t) - rs := NewReadableStreamForReader(r.VU, bytes.NewReader([]byte(exp))) + rs := NewReadableStreamFromReader(r.VU, bytes.NewReader([]byte(exp))) require.NoError(t, r.VU.Runtime().Set("rs", rs)) // Then, we run some JS code that reads from the ReadableStream(rs). - var ret goja.Value + var ret sobek.Value err := r.EventLoop.Start(func() (err error) { ret, err = r.VU.Runtime().RunString(`(async () => { const reader = rs.getReader(); @@ -35,7 +35,7 @@ func TestNewReadableStreamForReader(t *testing.T) { // Finally, we expect the returned promise to resolve // to the expected value (the one we streamed). - p, ok := ret.Export().(*goja.Promise) + p, ok := ret.Export().(*sobek.Promise) require.True(t, ok) assert.Equal(t, exp, p.Result().String()) }