diff --git a/js/modules/k6/experimental/streams/module.go b/js/modules/k6/experimental/streams/module.go index 51018a5ea79..c91d874d6c0 100644 --- a/js/modules/k6/experimental/streams/module.go +++ b/js/modules/k6/experimental/streams/module.go @@ -2,6 +2,9 @@ package streams import ( + "errors" + "io" + "github.com/dop251/goja" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" @@ -46,7 +49,12 @@ func (mi *ModuleInstance) Exports() modules.Exports { // NewReadableStream is the constructor for the ReadableStream object. func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Object { - rt := mi.vu.Runtime() + return newReadableStream(mi.vu, call) +} + +func newReadableStream(vu modules.VU, call goja.ConstructorCall) *goja.Object { + rt := vu.Runtime() + var err error // 1. If underlyingSource is missing, set it to null. @@ -60,7 +68,7 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj // We look for the queuing strategy first, and validate it before // the underlying source, in order to pass the Web Platform Tests // constructor tests. - strategy = mi.initializeStrategy(call) + strategy = initializeStrategy(rt, call) // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource. if len(call.Arguments) > 0 && !goja.IsUndefined(call.Arguments[0]) { @@ -79,8 +87,8 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj // 3. Perform ! InitializeReadableStream(this). stream := &ReadableStream{ - runtime: mi.vu.Runtime(), - vu: mi.vu, + runtime: rt, + vu: vu, } stream.initialize() @@ -129,27 +137,25 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj } func defaultSizeFunc(_ goja.Value) (float64, error) { return 1.0, nil } -func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Object { - runtime := mi.vu.Runtime() - +func initializeStrategy(rt *goja.Runtime, call goja.ConstructorCall) *goja.Object { // Either if the strategy is not provided or if it doesn't have a 'highWaterMark', // we need to set its default value (highWaterMark=1). // https://streams.spec.whatwg.org/#rs-prototype - strArg := runtime.NewObject() + strArg := rt.NewObject() if len(call.Arguments) > 1 && !common.IsNullish(call.Arguments[1]) { - strArg = call.Arguments[1].ToObject(runtime) + strArg = call.Arguments[1].ToObject(rt) } if common.IsNullish(strArg.Get("highWaterMark")) { - if err := strArg.Set("highWaterMark", runtime.ToValue(1)); err != nil { - common.Throw(runtime, newError(RuntimeError, err.Error())) + if err := strArg.Set("highWaterMark", rt.ToValue(1)); err != nil { + common.Throw(rt, newError(RuntimeError, err.Error())) } } // If the stream type is 'bytes', we don't want the size function. // Except, when it is manually specified. - size := runtime.ToValue(defaultSizeFunc) + size := rt.ToValue(defaultSizeFunc) if len(call.Arguments) > 0 && !common.IsNullish(call.Arguments[0]) { - srcArg := call.Arguments[0].ToObject(runtime) + srcArg := call.Arguments[0].ToObject(rt) srcTypeArg := srcArg.Get("type") if !common.IsNullish(srcTypeArg) && srcTypeArg.String() == ReadableStreamTypeBytes { size = nil @@ -160,7 +166,7 @@ func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Ob } strCall := goja.ConstructorCall{Arguments: []goja.Value{strArg}} - return mi.newCountQueuingStrategy(runtime, strCall, size) + return newCountQueuingStrategy(rt, strCall, size) } // NewCountQueuingStrategy is the constructor for the [CountQueuingStrategy] object. @@ -170,14 +176,14 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call goja.ConstructorCall) *go rt := mi.vu.Runtime() // By default, the CountQueuingStrategy has a pre-defined 'size' property. // It cannot be overwritten by the user. - return mi.newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) + return newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) } // newCountQueuingStrategy is the underlying constructor for the [CountQueuingStrategy] object. // // It allows to create a CountQueuingStrategy with or without the 'size' property, // depending on how the containing ReadableStream is initialized. -func (mi *ModuleInstance) newCountQueuingStrategy( +func newCountQueuingStrategy( rt *goja.Runtime, call goja.ConstructorCall, size goja.Value, @@ -285,3 +291,46 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call goja.ConstructorCa return object } + +func NewReadableStreamForReader(vu modules.VU, reader io.Reader) *goja.Object { + rt := vu.Runtime() + return newReadableStream(vu, goja.ConstructorCall{ + Arguments: []goja.Value{rt.ToValue(underlyingSourceForReader(vu, reader))}, + This: rt.NewObject(), + }) +} + +func underlyingSourceForReader(vu modules.VU, reader io.Reader) *goja.Object { + rt := vu.Runtime() + + underlyingSource := vu.Runtime().NewObject() + if err := underlyingSource.Set("pull", rt.ToValue(func(controller *goja.Object) *goja.Promise { + // Prepare methods + cClose, _ := goja.AssertFunction(controller.Get("close")) + cEnqueue, _ := goja.AssertFunction(controller.Get("enqueue")) + + buf := make([]byte, 1024) + n, err := reader.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + panic(err) + } + + _, enqueueErr := cEnqueue(nil, rt.ToValue(string(buf[:n]))) + if enqueueErr != nil { + panic(enqueueErr) + } + + if err == io.EOF { + _, closeErr := cClose(nil) + if closeErr != nil { + panic(closeErr) + } + } + + return newResolvedPromise(vu, goja.Undefined()) + })); err != nil { + throw(rt, err) + } + + return underlyingSource +}