Skip to content

Commit

Permalink
Support to init a ReadableStream from a io.Reader (#3740)
Browse files Browse the repository at this point in the history
* Support to init a ReadableStream from a io.Reader

* streams.NewReadableStreamForReader test

* Fix linter complains

* Apply suggestions from code review

* Replace goja => sobek
  • Loading branch information
joanlopez authored Jun 7, 2024
1 parent e5b00db commit c5f57cc
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 20 deletions.
91 changes: 71 additions & 20 deletions js/modules/k6/experimental/streams/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
package streams

import (
"errors"
"io"

"github.com/grafana/sobek"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
Expand Down Expand Up @@ -46,21 +49,25 @@ func (mi *ModuleInstance) Exports() modules.Exports {

// NewReadableStream is the constructor for the ReadableStream object.
func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.Object {
rt := mi.vu.Runtime()
var err error

// 1. If underlyingSource is missing, set it to null.
var underlyingSource *sobek.Object
return newReadableStream(mi.vu, call)
}

func newReadableStream(vu modules.VU, call sobek.ConstructorCall) *sobek.Object {
var (
// 1. If underlyingSource is missing, set it to null.
underlyingSource *sobek.Object

rt = vu.Runtime()

err error
strategy *sobek.Object
underlyingSourceDict UnderlyingSource
)

// We look for the queuing strategy first, and validate it before
// the underlying source, in order to pass the Web Platform Tests
// constructor tests.
strategy = mi.initializeStrategy(call)
strategy = initializeStrategy(rt, call)

// 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource.
if len(call.Arguments) > 0 && !sobek.IsUndefined(call.Arguments[0]) {
Expand All @@ -79,8 +86,8 @@ func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.O

// 3. Perform ! InitializeReadableStream(this).
stream := &ReadableStream{
runtime: mi.vu.Runtime(),
vu: mi.vu,
runtime: rt,
vu: vu,
}
stream.initialize()

Expand Down Expand Up @@ -129,27 +136,25 @@ func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.O
}
func defaultSizeFunc(_ sobek.Value) (float64, error) { return 1.0, nil }

func (mi *ModuleInstance) initializeStrategy(call sobek.ConstructorCall) *sobek.Object {
runtime := mi.vu.Runtime()

func initializeStrategy(rt *sobek.Runtime, call sobek.ConstructorCall) *sobek.Object {
// Either if the strategy is not provided or if it doesn't have a 'highWaterMark',
// we need to set its default value (highWaterMark=1).
// https://streams.spec.whatwg.org/#rs-prototype
strArg := runtime.NewObject()
strArg := rt.NewObject()
if len(call.Arguments) > 1 && !common.IsNullish(call.Arguments[1]) {
strArg = call.Arguments[1].ToObject(runtime)
strArg = call.Arguments[1].ToObject(rt)
}
if common.IsNullish(strArg.Get("highWaterMark")) {
if err := strArg.Set("highWaterMark", runtime.ToValue(1)); err != nil {
common.Throw(runtime, newError(RuntimeError, err.Error()))
if err := strArg.Set("highWaterMark", rt.ToValue(1)); err != nil {
common.Throw(rt, newError(RuntimeError, err.Error()))
}
}

// If the stream type is 'bytes', we don't want the size function.
// Except, when it is manually specified.
size := runtime.ToValue(defaultSizeFunc)
size := rt.ToValue(defaultSizeFunc)
if len(call.Arguments) > 0 && !common.IsNullish(call.Arguments[0]) {
srcArg := call.Arguments[0].ToObject(runtime)
srcArg := call.Arguments[0].ToObject(rt)
srcTypeArg := srcArg.Get("type")
if !common.IsNullish(srcTypeArg) && srcTypeArg.String() == ReadableStreamTypeBytes {
size = nil
Expand All @@ -160,7 +165,7 @@ func (mi *ModuleInstance) initializeStrategy(call sobek.ConstructorCall) *sobek.
}

strCall := sobek.ConstructorCall{Arguments: []sobek.Value{strArg}}
return mi.newCountQueuingStrategy(runtime, strCall, size)
return newCountQueuingStrategy(rt, strCall, size)
}

// NewCountQueuingStrategy is the constructor for the [CountQueuingStrategy] object.
Expand All @@ -170,14 +175,14 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call sobek.ConstructorCall) *s
rt := mi.vu.Runtime()
// By default, the CountQueuingStrategy has a pre-defined 'size' property.
// It cannot be overwritten by the user.
return mi.newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc))
return newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc))
}

// newCountQueuingStrategy is the underlying constructor for the [CountQueuingStrategy] object.
//
// It allows to create a CountQueuingStrategy with or without the 'size' property,
// depending on how the containing ReadableStream is initialized.
func (mi *ModuleInstance) newCountQueuingStrategy(
func newCountQueuingStrategy(
rt *sobek.Runtime,
call sobek.ConstructorCall,
size sobek.Value,
Expand Down Expand Up @@ -285,3 +290,49 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call sobek.ConstructorC

return object
}

// NewReadableStreamFromReader is the equivalent of [NewReadableStreamDefaultReader] but to initialize
// a new [ReadableStream] from a given [io.Reader] in Go code.
// It is useful for those situations when a [io.Reader] needs to be surfaced up to the JS runtime.
func NewReadableStreamFromReader(vu modules.VU, reader io.Reader) *sobek.Object {
rt := vu.Runtime()
return newReadableStream(vu, sobek.ConstructorCall{
Arguments: []sobek.Value{rt.ToValue(underlyingSourceForReader(vu, reader))},
This: rt.NewObject(),
})
}

func underlyingSourceForReader(vu modules.VU, reader io.Reader) *sobek.Object {
rt := vu.Runtime()

underlyingSource := vu.Runtime().NewObject()
if err := underlyingSource.Set("pull", rt.ToValue(func(controller *sobek.Object) *sobek.Promise {
// Prepare methods
cClose, _ := sobek.AssertFunction(controller.Get("close"))
cEnqueue, _ := sobek.AssertFunction(controller.Get("enqueue"))

buf := make([]byte, 1024)
n, err := reader.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
panic(err)
}

_, enqueueErr := cEnqueue(nil, rt.ToValue(string(buf[:n])))
if enqueueErr != nil {
panic(enqueueErr)
}

if err == io.EOF {
_, closeErr := cClose(nil)
if closeErr != nil {
panic(closeErr)
}
}

return newResolvedPromise(vu, sobek.Undefined())
})); err != nil {
throw(rt, err)
}

return underlyingSource
}
41 changes: 41 additions & 0 deletions js/modules/k6/experimental/streams/module_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package streams

import (
"bytes"
"testing"

"github.com/grafana/sobek"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js/modulestest"
)

func TestNewReadableStreamForReader(t *testing.T) {
t.Parallel()

// The value to be streamed.
exp := "Hello, World!"

// We initialize the runtime, with the ReadableStream(rs) accessible in JS.
r := modulestest.NewRuntime(t)
rs := NewReadableStreamFromReader(r.VU, bytes.NewReader([]byte(exp)))
require.NoError(t, r.VU.Runtime().Set("rs", rs))

// Then, we run some JS code that reads from the ReadableStream(rs).
var ret sobek.Value
err := r.EventLoop.Start(func() (err error) {
ret, err = r.VU.Runtime().RunString(`(async () => {
const reader = rs.getReader();
const {value} = await reader.read();
return value;
})()`)
return err
})
assert.NoError(t, err)

// Finally, we expect the returned promise to resolve
// to the expected value (the one we streamed).
p, ok := ret.Export().(*sobek.Promise)
require.True(t, ok)
assert.Equal(t, exp, p.Result().String())
}

0 comments on commit c5f57cc

Please sign in to comment.