Skip to content

Commit

Permalink
Support to init a ReadableStream from a io.Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
joanlopez committed May 14, 2024
1 parent 153e0e5 commit 541c814
Showing 1 changed file with 65 additions and 16 deletions.
81 changes: 65 additions & 16 deletions js/modules/k6/experimental/streams/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
package streams

import (
"errors"
"io"

"github.com/dop251/goja"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
Expand Down Expand Up @@ -46,7 +49,12 @@ func (mi *ModuleInstance) Exports() modules.Exports {

// NewReadableStream is the constructor for the ReadableStream object.
func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Object {
rt := mi.vu.Runtime()
return newReadableStream(mi.vu, call)
}

func newReadableStream(vu modules.VU, call goja.ConstructorCall) *goja.Object {

Check failure on line 55 in js/modules/k6/experimental/streams/module.go

View workflow job for this annotation

GitHub Actions / lint

Function 'newReadableStream' is too long (81 > 80) (funlen)
rt := vu.Runtime()

var err error

// 1. If underlyingSource is missing, set it to null.
Expand All @@ -60,7 +68,7 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj
// We look for the queuing strategy first, and validate it before
// the underlying source, in order to pass the Web Platform Tests
// constructor tests.
strategy = mi.initializeStrategy(call)
strategy = initializeStrategy(rt, call)

// 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource.
if len(call.Arguments) > 0 && !goja.IsUndefined(call.Arguments[0]) {
Expand All @@ -79,8 +87,8 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj

// 3. Perform ! InitializeReadableStream(this).
stream := &ReadableStream{
runtime: mi.vu.Runtime(),
vu: mi.vu,
runtime: rt,
vu: vu,
}
stream.initialize()

Expand Down Expand Up @@ -129,27 +137,25 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj
}
func defaultSizeFunc(_ goja.Value) (float64, error) { return 1.0, nil }

func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Object {
runtime := mi.vu.Runtime()

func initializeStrategy(rt *goja.Runtime, call goja.ConstructorCall) *goja.Object {
// Either if the strategy is not provided or if it doesn't have a 'highWaterMark',
// we need to set its default value (highWaterMark=1).
// https://streams.spec.whatwg.org/#rs-prototype
strArg := runtime.NewObject()
strArg := rt.NewObject()
if len(call.Arguments) > 1 && !common.IsNullish(call.Arguments[1]) {
strArg = call.Arguments[1].ToObject(runtime)
strArg = call.Arguments[1].ToObject(rt)
}
if common.IsNullish(strArg.Get("highWaterMark")) {
if err := strArg.Set("highWaterMark", runtime.ToValue(1)); err != nil {
common.Throw(runtime, newError(RuntimeError, err.Error()))
if err := strArg.Set("highWaterMark", rt.ToValue(1)); err != nil {
common.Throw(rt, newError(RuntimeError, err.Error()))
}
}

// If the stream type is 'bytes', we don't want the size function.
// Except, when it is manually specified.
size := runtime.ToValue(defaultSizeFunc)
size := rt.ToValue(defaultSizeFunc)
if len(call.Arguments) > 0 && !common.IsNullish(call.Arguments[0]) {
srcArg := call.Arguments[0].ToObject(runtime)
srcArg := call.Arguments[0].ToObject(rt)
srcTypeArg := srcArg.Get("type")
if !common.IsNullish(srcTypeArg) && srcTypeArg.String() == ReadableStreamTypeBytes {
size = nil
Expand All @@ -160,7 +166,7 @@ func (mi *ModuleInstance) initializeStrategy(call goja.ConstructorCall) *goja.Ob
}

strCall := goja.ConstructorCall{Arguments: []goja.Value{strArg}}
return mi.newCountQueuingStrategy(runtime, strCall, size)
return newCountQueuingStrategy(rt, strCall, size)
}

// NewCountQueuingStrategy is the constructor for the [CountQueuingStrategy] object.
Expand All @@ -170,14 +176,14 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call goja.ConstructorCall) *go
rt := mi.vu.Runtime()
// By default, the CountQueuingStrategy has a pre-defined 'size' property.
// It cannot be overwritten by the user.
return mi.newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc))
return newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc))
}

// newCountQueuingStrategy is the underlying constructor for the [CountQueuingStrategy] object.
//
// It allows to create a CountQueuingStrategy with or without the 'size' property,
// depending on how the containing ReadableStream is initialized.
func (mi *ModuleInstance) newCountQueuingStrategy(
func newCountQueuingStrategy(
rt *goja.Runtime,
call goja.ConstructorCall,
size goja.Value,
Expand Down Expand Up @@ -285,3 +291,46 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call goja.ConstructorCa

return object
}

func NewReadableStreamForReader(vu modules.VU, reader io.Reader) *goja.Object {

Check warning on line 295 in js/modules/k6/experimental/streams/module.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function NewReadableStreamForReader should have comment or be unexported (revive)
rt := vu.Runtime()
return newReadableStream(vu, goja.ConstructorCall{
Arguments: []goja.Value{rt.ToValue(underlyingSourceForReader(vu, reader))},
This: rt.NewObject(),
})
}

func underlyingSourceForReader(vu modules.VU, reader io.Reader) *goja.Object {
rt := vu.Runtime()

underlyingSource := vu.Runtime().NewObject()
if err := underlyingSource.Set("pull", rt.ToValue(func(controller *goja.Object) *goja.Promise {
// Prepare methods
cClose, _ := goja.AssertFunction(controller.Get("close"))
cEnqueue, _ := goja.AssertFunction(controller.Get("enqueue"))

buf := make([]byte, 1024)
n, err := reader.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
panic(err)
}

_, enqueueErr := cEnqueue(nil, rt.ToValue(string(buf[:n])))
if enqueueErr != nil {
panic(enqueueErr)
}

if err == io.EOF {
_, closeErr := cClose(nil)
if closeErr != nil {
panic(closeErr)
}
}

return newResolvedPromise(vu, goja.Undefined())
})); err != nil {
throw(rt, err)
}

return underlyingSource
}

0 comments on commit 541c814

Please sign in to comment.