Skip to content

Commit

Permalink
Address potential races in fs.read and fs.seek operations (#3420)
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiade authored Nov 1, 2023
1 parent 0a4f80b commit efdd43f
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 50 deletions.
59 changes: 37 additions & 22 deletions js/modules/k6/experimental/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package fs
import (
"io"
"path/filepath"
"sync/atomic"

"go.k6.io/k6/lib"
)

// file is an abstraction for interacting with files.
Expand All @@ -13,7 +16,11 @@ type file struct {
data []byte

// offset holds the current offset in the file
offset int
//
// TODO: using an atomic here does not guarantee ordering of reads and seeks, and leaves
// the behavior not strictly defined. This is something we might want to address in the future, and
// is tracked as part of #3433.
offset atomic.Int64
}

// Stat returns a FileInfo describing the named file.
Expand All @@ -38,24 +45,27 @@ type FileInfo struct {
//
// If the end of the file has been reached, it returns EOFError.
func (f *file) Read(into []byte) (n int, err error) {
start := f.offset
if start == len(f.data) {
currentOffset := f.offset.Load()
fileSize := f.size()

// Check if we have reached the end of the file
if currentOffset == fileSize {
return 0, newFsError(EOFError, "EOF")
}

end := f.offset + len(into)
if end > len(f.data) {
end = len(f.data)
// We align with the [io.Reader.Read] method's behavior
// and return EOFError when we reach the end of the
// file, regardless of how much data we were able to
// read.
err = newFsError(EOFError, "EOF")
}
// Calculate the effective new offset
targetOffset := currentOffset + int64(len(into))
newOffset := lib.Min(targetOffset, fileSize)

n = copy(into, f.data[start:end])
// Read the data into the provided slice, and update
// the offset accordingly
n = copy(into, f.data[currentOffset:newOffset])
f.offset.Store(newOffset)

f.offset += n
// If we've reached or surpassed the end, set the error to EOF
if targetOffset > fileSize {
err = newFsError(EOFError, "EOF")
}

return n, err
}
Expand All @@ -71,9 +81,10 @@ var _ io.Reader = (*file)(nil)
//
// When using SeekModeStart, the offset must be positive.
// Negative offsets are allowed when using `SeekModeCurrent` or `SeekModeEnd`.
func (f *file) Seek(offset int, whence SeekMode) (int, error) {
newOffset := f.offset
func (f *file) Seek(offset int64, whence SeekMode) (int64, error) {
startingOffset := f.offset.Load()

newOffset := startingOffset
switch whence {
case SeekModeStart:
if offset < 0 {
Expand All @@ -88,7 +99,7 @@ func (f *file) Seek(offset int, whence SeekMode) (int, error) {
return 0, newFsError(TypeError, "offset cannot be positive when using SeekModeEnd")
}

newOffset = len(f.data) + offset
newOffset = f.size() + offset
default:
return 0, newFsError(TypeError, "invalid seek mode")
}
Expand All @@ -97,18 +108,18 @@ func (f *file) Seek(offset int, whence SeekMode) (int, error) {
return 0, newFsError(TypeError, "seeking before start of file")
}

if newOffset > len(f.data) {
if newOffset > f.size() {
return 0, newFsError(TypeError, "seeking beyond end of file")
}

// Note that the implementation assumes one `file` instance per file/vu.
// If that assumption was invalidated, we would need to atomically update
// the offset instead.
f.offset = newOffset
// Update the file instance's offset to the new selected position
f.offset.Store(newOffset)

return newOffset, nil
}

var _ io.Seeker = (*file)(nil)

// SeekMode is used to specify the seek mode when seeking in a file.
type SeekMode = int

Expand All @@ -125,3 +136,7 @@ const (
// the end of the file.
SeekModeEnd
)

func (f *file) size() int64 {
return int64(len(f.data))
}
17 changes: 9 additions & 8 deletions js/modules/k6/experimental/fs/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestFileImpl(t *testing.T) {
name string
into []byte
fileData []byte
offset int
offset int64
wantInto []byte
wantN int
wantErr errorKind
Expand Down Expand Up @@ -105,10 +105,10 @@ func TestFileImpl(t *testing.T) {
t.Parallel()

f := &file{
path: "",
data: tc.fileData,
offset: tc.offset,
path: "",
data: tc.fileData,
}
f.offset.Store(tc.offset)

gotN, err := f.Read(tc.into)

Expand Down Expand Up @@ -138,16 +138,16 @@ func TestFileImpl(t *testing.T) {
t.Parallel()

type args struct {
offset int
offset int64
whence SeekMode
}

// The test file is 100 bytes long
tests := []struct {
name string
fileOffset int
fileOffset int64
args args
wantOffset int
wantOffset int64
wantError bool
}{
{
Expand Down Expand Up @@ -242,7 +242,8 @@ func TestFileImpl(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

f := &file{data: make([]byte, 100), offset: tt.fileOffset}
f := &file{data: make([]byte, 100)}
f.offset.Store(tt.fileOffset)

got, err := f.Seek(tt.args.offset, tt.args.whence)
if tt.wantError {
Expand Down
49 changes: 42 additions & 7 deletions js/modules/k6/experimental/fs/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,31 @@ func (f *File) Stat() *goja.Promise {
//
// It is possible for a read to successfully return with 0 bytes.
// This does not indicate EOF.
//
//nolint:funlen
func (f *File) Read(into goja.Value) *goja.Promise {
promise, resolve, reject := promises.New(f.vu)
// This method performs an asynchronous read operation and modifies the provided Uint8Array in place.
// To ensure thread safety and avoid concurrency issues, we take special precautions when creating the promise:
//
// 1. Instead of using the standard [promises.New] method, we manually create a promise.
// 2. We register a callback to be executed by the VU's runtime. This ensures that the modification
// of the JS runtime's `buffer` occurs on the main thread during the promise's resolution.
promise, resolveFunc, rejectFunc := f.vu.Runtime().NewPromise()
callback := f.vu.RegisterCallback()

resolve := func(result any) {
callback(func() error {
resolveFunc(result)
return nil
})
}

reject := func(reason any) {
callback(func() error {
rejectFunc(reason)
return nil
})
}

if common.IsNullish(into) {
reject(newFsError(TypeError, "read() failed; reason: into cannot be null or undefined"))
Expand All @@ -209,15 +232,27 @@ func (f *File) Read(into goja.Value) *goja.Promise {
return promise
}

// Obtain the underlying byte slice from the ArrayBuffer.
// Note that this is not a copy, and will be modified by the Read operation
// in place.
buffer := ab.Bytes()
// Copy the ArrayBuffer into a byte slice, so that we can pass it to the
// [file.Read] method without risking to modify the original ArrayBuffer, and
// running into concurrency issues (data race).
intoBytes := ab.Bytes()
buffer := make([]byte, len(intoBytes))

go func() {
n, err := f.file.Read(buffer)
if err == nil {
resolve(n)
// Although the read operation happens as part of the goroutine, we
// still need to make sure that:
// 1. Any side effects, like modifying the `buffer`, are deferred and
// executed on the main thread via the registered callback.
// 2. This approach ensures that while the file read operation can proceed
// asynchronously, any side effects that might interfere with the JS runtime
// are executed in a controlled and sequential manner on the main thread.
callback(func() error {
_ = copy(intoBytes, buffer)
resolveFunc(n)
return nil
})
return
}

Expand Down Expand Up @@ -287,7 +322,7 @@ func (f *File) Seek(offset goja.Value, whence goja.Value) *goja.Promise {
}

go func() {
newOffset, err := f.file.Seek(int(intOffset), seekMode)
newOffset, err := f.file.Seek(intOffset, seekMode)
if err != nil {
reject(err)
return
Expand Down
Loading

0 comments on commit efdd43f

Please sign in to comment.