Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing MinObject related issues in MRDWrapper #2936

Merged
merged 9 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ func NewFileInode(
unlinked: false,
config: cfg,
globalMaxWriteBlocksSem: globalMaxBlocksSem,
MRDWrapper: gcsx.NewMultiRangeDownloaderWrapper(bucket, &minObj),
}
var err error
f.MRDWrapper, err = gcsx.NewMultiRangeDownloaderWrapper(bucket, &f.src)
if err != nil {
logger.Errorf("NewFileInode: Error in creating MRDWrapper %v", err)
}

f.lc.Init(id)
Expand Down Expand Up @@ -701,6 +705,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
return
}

Expand Down Expand Up @@ -836,6 +841,8 @@ func (f *FileInode) Flush(ctx context.Context) (err error) {
func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -852,6 +859,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
return
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(&f.src)
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
}
}

// Truncate the file to the specified size.
//
// LOCKS_REQUIRED(f.mu)
Expand Down
22 changes: 22 additions & 0 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -489,6 +492,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
assert.Nil(t.T(), err)
Expand Down Expand Up @@ -547,6 +552,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Object, m.Generation)
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -620,6 +627,9 @@ func (t *FileTest) TestAppendThenSync() {
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())

Expand Down Expand Up @@ -676,6 +686,9 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -742,6 +755,9 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -1031,6 +1047,9 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
assert.True(t.T(), errors.As(err, &fcErr), "expected FileClobberedError but got %v", err)
Expand Down Expand Up @@ -1155,6 +1174,9 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

assert.Nil(t.T(), err)
assert.NotNil(t.T(), m)
assert.Equal(t.T(),
Expand Down
31 changes: 28 additions & 3 deletions internal/gcsx/multi_range_downloader_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ import (
// it's refcount reaches 0.
const multiRangeDownloaderTimeout = 60 * time.Second

func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) MultiRangeDownloaderWrapper {
func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) (MultiRangeDownloaderWrapper, error) {
return NewMultiRangeDownloaderWrapperWithClock(bucket, object, clock.RealClock{})
}

func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) MultiRangeDownloaderWrapper {
func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) (MultiRangeDownloaderWrapper, error) {
if object == nil {
return MultiRangeDownloaderWrapper{}, fmt.Errorf("NewMultiRangeDownloaderWrapperWithClock: Missing MinObject")
}
// In case of a local inode, MRDWrapper would be created with an empty minObject (i.e. with a minObject without any information)
// and when the object is actually created, MRDWrapper would be updated using SetMinObject method.
return MultiRangeDownloaderWrapper{
clock: clock,
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved
bucket: bucket,
object: object,
}
}, nil
}

type readResult struct {
Expand All @@ -56,6 +61,7 @@ type MultiRangeDownloaderWrapper struct {
Wrapped gcs.MultiRangeDownloader

// Bucket and object details for MultiRangeDownloader.
// Object should not be nil.
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved
object *gcs.MinObject
bucket gcs.Bucket

Expand All @@ -69,6 +75,20 @@ type MultiRangeDownloaderWrapper struct {
clock clock.Clock
}

// Sets the gcs.MinObject stored in the wrapper to passed value, only if it's non nil.
func (mrdWrapper *MultiRangeDownloaderWrapper) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject")
}
mrdWrapper.object = minObj
return nil
}

// Returns the minObject stored in MultiRangeDownloaderWrapper. Used only for unit testing.
func (mrdWrapper *MultiRangeDownloaderWrapper) GetMinObject() *gcs.MinObject {
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved
return mrdWrapper.object
}

// Returns current refcount.
func (mrdWrapper *MultiRangeDownloaderWrapper) GetRefCount() int {
mrdWrapper.mu.Lock()
Expand Down Expand Up @@ -136,6 +156,10 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) cleanupMultiRangeDownloader() {

// Ensures that MultiRangeDownloader exists, creating it if it does not exist.
func (mrdWrapper *MultiRangeDownloaderWrapper) ensureMultiRangeDownloader() (err error) {
if mrdWrapper.object == nil || mrdWrapper.bucket == nil {
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket")
}

if mrdWrapper.Wrapped == nil {
mrdWrapper.Wrapped, err = mrdWrapper.bucket.NewMultiRangeDownloader(context.Background(), &gcs.MultiRangeDownloaderRequest{
Name: mrdWrapper.object.Name,
Expand All @@ -158,6 +182,7 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) Read(ctx context.Context, buf []b
err = mrdWrapper.ensureMultiRangeDownloader()
if err != nil {
err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: Error in creating MultiRangeDownloader: %v", err)
mrdWrapper.Wrapped = nil
return
}

Expand Down
116 changes: 115 additions & 1 deletion internal/gcsx/multi_range_downloader_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gcsx

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -45,6 +46,7 @@ func TestMRDWrapperTestSuite(t *testing.T) {
}

func (t *mrdWrapperTest) SetupTest() {
var err error
t.object = &gcs.MinObject{
Name: "foo",
Size: 100,
Expand All @@ -54,7 +56,8 @@ func (t *mrdWrapperTest) SetupTest() {
// Create the bucket.
t.mockBucket = new(storage.TestifyMockBucket)
t.mrdTimeout = time.Millisecond
t.mrdWrapper = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout})
t.mrdWrapper, err = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.mrdWrapper.Wrapped = fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond)
t.mrdWrapper.refCount = 0
}
Expand Down Expand Up @@ -159,3 +162,114 @@ func (t *mrdWrapperTest) Test_Read() {
})
}
}

func (t *mrdWrapperTest) Test_NewMultiRangeDownloaderWrapper() {
testCases := []struct {
name string
bucket gcs.Bucket
obj *gcs.MinObject
err error
}{
{
name: "ValidParameters",
bucket: t.mockBucket,
obj: t.object,
err: nil,
},
{
name: "NilMinObject",
bucket: t.mockBucket,
obj: nil,
err: fmt.Errorf("NewMultiRangeDownloaderWrapperWithClock: Missing MinObject"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
_, err := NewMultiRangeDownloaderWrapper(tc.bucket, tc.obj)
if tc.err == nil {
assert.NoError(t.T(), err)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
}
})
}
}

func (t *mrdWrapperTest) Test_SetMinObject() {
testCases := []struct {
name string
obj *gcs.MinObject
err error
}{
{
name: "ValidMinObject",
obj: t.object,
err: nil,
},
{
name: "NilMinObject",
obj: nil,
err: fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
err := t.mrdWrapper.SetMinObject(tc.obj)
if tc.err == nil {
assert.NoError(t.T(), err)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
}
})
}
}

func (t *mrdWrapperTest) Test_EnsureMultiRangeDownloader() {
testCases := []struct {
name string
obj *gcs.MinObject
bucket gcs.Bucket
err error
}{
{
name: "ValidMinObject",
obj: t.object,
bucket: t.mockBucket,
err: nil,
},
{
name: "NilMinObject",
obj: nil,
bucket: t.mockBucket,
err: fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket"),
},
{
name: "NilBucket",
obj: t.object,
bucket: nil,
err: fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
t.mrdWrapper.bucket = tc.bucket
t.mrdWrapper.object = tc.obj
t.mrdWrapper.Wrapped = nil
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond))
err := t.mrdWrapper.ensureMultiRangeDownloader()
if tc.err == nil {
assert.NoError(t.T(), err)
assert.NotNil(t.T(), t.mrdWrapper.Wrapped)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
assert.Nil(t.T(), t.mrdWrapper.Wrapped)
}
})
}
}
12 changes: 8 additions & 4 deletions internal/gcsx/random_reader_stretchr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ func (t *RandomReaderStretchrTest) Test_ReadAt_MRDRead() {
t.rr.wrapped.seeks = minSeeksForRandom + 1
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -688,7 +689,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadFull() {
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -722,7 +724,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadChunk() {
t.rr.wrapped.reader = nil
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -761,7 +764,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ValidateTimeout
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, tc.sleepTime)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down
Loading