Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReadableStream support to k6 #3696

Merged
merged 22 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d7fa621
Add base implementation of ReadableStream (Streams API)
joanlopez Apr 19, 2024
b5e0124
Base tooling for Web Platform Tests
joanlopez Apr 19, 2024
bed700b
Web Platform Tests for ReadableStream
joanlopez Apr 19, 2024
5461ef1
Expose k6/experimental/streams
joanlopez Apr 19, 2024
d546366
Add k6/experimental/streams example
joanlopez Apr 19, 2024
347a26d
Chore: Fix linter errors
joanlopez Apr 19, 2024
b881b97
Resolve import cycle
joanlopez Apr 19, 2024
b73c40f
Remove handmade WPTs
joanlopez Apr 24, 2024
3ecab30
Remove test specifics from shared modulestest
joanlopez Apr 24, 2024
7401502
Adjust ReadableStream WPTs to use checked out code
joanlopez Apr 24, 2024
c691261
Add GH Workflow to run WPTs for Streams
joanlopez Apr 24, 2024
60e087d
Merge remote-tracking branch 'upstream/master' into readable-stream
joanlopez Apr 24, 2024
1e06c44
Apply Pull Request suggestions
oleiade Apr 25, 2024
cf4f439
Apply Pull Request suggestions
oleiade Apr 25, 2024
31a2554
Apply code review suggestions
joanlopez Apr 25, 2024
08824fa
Merge branch 'readable-stream' of github.com:grafana/k6 into readable…
joanlopez Apr 25, 2024
2a90895
Fix Web Platform Tests execution for streams
joanlopez Apr 26, 2024
c301edc
Apply code review suggestions
joanlopez Apr 26, 2024
386dd73
Merge remote-tracking branch 'upstream/master' into readable-stream
joanlopez Apr 26, 2024
c028e12
Fix linting issue
joanlopez Apr 26, 2024
a35dad0
Apply pull request review suggestions
oleiade Apr 29, 2024
89302f2
Merge branch 'master' into readable-stream
oleiade Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/experimental/streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { ReadableStream } from 'k6/experimental/streams'
import { setTimeout } from 'k6/timers'

function numbersStream() {
let currentNumber = 0

return new ReadableStream({
start(controller) {
const fn = () => {
if (currentNumber < 5) {
controller.enqueue(++currentNumber)
setTimeout(fn, 1000)
return;
}

controller.close()
}
setTimeout(fn, 1000)
},
})
}

export default async function () {
const stream = numbersStream()
const reader = stream.getReader()

while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(`received number ${value} from stream`)
}

console.log('we are done')
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/streams"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
"go.k6.io/k6/js/modules/k6/grpc"
"go.k6.io/k6/js/modules/k6/html"
Expand All @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/timers": timers.New(),
"k6/execution": execution.New(),
"k6/experimental/redis": redis.New(),
"k6/experimental/streams": streams.New(),
"k6/experimental/webcrypto": webcrypto.New(),
"k6/experimental/websockets": &expws.RootModule{},
"k6/experimental/timers": newWarnExperimentalModule(timers.New(),
Expand Down
64 changes: 64 additions & 0 deletions js/modules/k6/experimental/streams/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package streams

import "github.com/dop251/goja"

func newError(k errorKind, message string) *streamError {
return &streamError{
Name: k.String(),
Message: message,
kind: k,
}
}

//go:generate enumer -type=errorKind -output errors_gen.go
type errorKind uint8

const (
// TypeError is thrown when an argument is not of an expected type
TypeError errorKind = iota + 1

// RangeError is thrown when an argument is not within the expected range
RangeError

// RuntimeError is thrown when an error occurs that was caused by the JS runtime
// and is not likely caused by the user, but rather the implementation.
RuntimeError

// AssertionError is thrown when an assertion fails
AssertionError
)

type streamError struct {
// Name contains the name of the error
Name string `json:"name"`

// Message contains the error message
Message string `json:"message"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had an internal discussion about this some time ago, but I see that you are not using the proposed solution of calling the already made by goja TypeError and RangeError constructors. This seems to require changes to the tests as that breaks instanceof in them.

a quick check shows that

	runtime := modulestest.NewRuntime(t)
	rt := runtime.VU.RuntimeField
	rangeErrorValue := rt.Get("RangeError")
	constructor, ok := goja.AssertConstructor(rangeErrorValue)

	_ = ok
	e, err := constructor(nil, rt.ToValue("some message"))
	require.NoError(t, err)

	_ = runtime.VU.Runtime().Set("something", e)
	_, err = runtime.RunOnEventLoop(`
		if (!(globalThis.something instanceof RangeError)) {
			throw "failure"
		}
	`)
	require.NoError(t, err)

works.

This isn't really blocking, but not using the goja error might be a problem in the future, so if there is a reason this wasn't done it will be nice to know about it.

on the remaining errors:

AssertionError seems to be used in cases where asserts are checked. Which I would argue isn't really necessary as all the asserts I have seen are in practice tests for if the implementation has internal inconsistencies - so panicking outright on those is likely okay, or just ignoring them. Error is not bad, I just would probably use GoError

RuntimeError seems to be another error you've made up that seems to be for everything not fitting the rest.

Again nothing bad with it ... but I am not certain you shouldn't just use the Error constructor or GoError for this one as well 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good idea! I may give it a try, I fully agree that, despite not blocking (most likely), it will probably make our lives easier in the long term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been trickier than I initially thought, because sometimes we need to return these errors as Go errors, or keep them stored in-memory and return/throw them later. So, we need to either convert every signature into any instead of error, which would be messy, or un/wrap these errors, which is... meh, but feasible.

That said, I feel like like I'm having low inspiration these days, so if you have any other better approach, suggestions will be more than welcome.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur with the points brought up by @joanlopez, refactoring the error mechanism here would be quite herculean. I would also much rather do that as a separate task, and even use this as an opportunity to return more standard/conventional JS errors to the runtime :)

I'll open an issue 👍🏻


// kind contains the kind of error
kind errorKind
}

// Ensure that the fsError type implements the Go `error` interface
var _ error = (*streamError)(nil)

func (e *streamError) Error() string {
return e.Name + ":" + e.Message
}

func throw(rt *goja.Runtime, err any) {
panic(errToObj(rt, err))
}

func errToObj(rt *goja.Runtime, err any) goja.Value {
// Undefined remains undefined.
if goja.IsUndefined(rt.ToValue(err)) {
return rt.ToValue(err)
}

if e, ok := err.(*goja.Exception); ok {
return e.Value().ToObject(rt)
}

return rt.ToValue(err).ToObject(rt)
}
87 changes: 87 additions & 0 deletions js/modules/k6/experimental/streams/errors_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions js/modules/k6/experimental/streams/goja.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package streams

import (
"fmt"
"reflect"

"github.com/dop251/goja"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

// newResolvedPromise instantiates a new resolved promise.
func newResolvedPromise(vu modules.VU, with goja.Value) *goja.Promise {
promise, resolve, _ := vu.Runtime().NewPromise()
resolve(with)
return promise
}

// newRejectedPromise instantiates a new rejected promise.
func newRejectedPromise(vu modules.VU, with any) *goja.Promise {
promise, _, reject := vu.Runtime().NewPromise()
reject(with)
return promise
}

// promiseThen facilitates instantiating a new promise and defining callbacks for to be executed
// on fulfillment as well as rejection, directly from Go.
func promiseThen(
rt *goja.Runtime,
promise *goja.Promise,
onFulfilled, onRejected func(goja.Value),
) (*goja.Promise, error) {
val, err := rt.RunString(
`(function(promise, onFulfilled, onRejected) { return promise.then(onFulfilled, onRejected) })`)
if err != nil {
return nil, newError(RuntimeError, "unable to initialize promiseThen internal helper function")
}

cal, ok := goja.AssertFunction(val)
if !ok {
return nil, newError(RuntimeError, "the internal promiseThen helper is not a function")
}

if onRejected == nil {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled))
} else {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled), rt.ToValue(onRejected))
}

if err != nil {
return nil, err
}

newPromise, ok := val.Export().(*goja.Promise)
if !ok {
return nil, newError(RuntimeError, "unable to cast the internal promiseThen helper's return value to a promise")
}

return newPromise, nil
}

// isNumber returns true if the given goja value holds a number
func isNumber(value goja.Value) bool {
_, isFloat := value.Export().(float64)
_, isInt := value.Export().(int64)

return isFloat || isInt
}

// isNonNegativeNumber implements the [IsNonNegativeNumber] algorithm.
//
// [IsNonNegativeNumber]: https://streams.spec.whatwg.org/#is-non-negative-number
func isNonNegativeNumber(value goja.Value) bool {
if common.IsNullish(value) {
return false
}

if !isNumber(value) {
return false
}

if value.ToFloat() < 0 || value.ToInteger() < 0 {
return false
}

return true
}

// setReadOnlyPropertyOf sets a read-only property on the given [goja.Object].
func setReadOnlyPropertyOf(obj *goja.Object, name string, value goja.Value) error {
err := obj.DefineDataProperty(name,
value,
goja.FLAG_FALSE,
goja.FLAG_FALSE,
goja.FLAG_TRUE,
)
if err != nil {
return fmt.Errorf("unable to define %s read-only property on TextEncoder object; reason: %w", name, err)
oleiade marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

// isObject determines whether the given [goja.Value] is a [goja.Object] or not.
func isObject(val goja.Value) bool {
return val != nil && val.ExportType() != nil && val.ExportType().Kind() == reflect.Map
}
Loading