Skip to content

Commit

Permalink
Integrate UploadHandler in BufferedWriteHandler struct (#2670)
Browse files Browse the repository at this point in the history
* temp commit

* revert buffered write handler changes

* buffered write handler changes to integrate upload handler

* review comments and rebase changes
  • Loading branch information
ashmeenkaur authored Nov 20, 2024
1 parent cac0ace commit a55f174
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 12 deletions.
29 changes: 19 additions & 10 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/block"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/sync/semaphore"
)

Expand All @@ -29,8 +30,9 @@ import (
// BufferedWriteHandler is responsible for filling up the buffers with the data
// as it receives and handing over to uploadHandler which uploads to GCS.
type BufferedWriteHandler struct {
current block.Block
blockPool *block.BlockPool
current block.Block
blockPool *block.BlockPool
uploadHandler *UploadHandler
// Total size of data buffered so far. Some part of buffered data might have
// been uploaded to GCS as well.
totalSize int64
Expand All @@ -45,17 +47,18 @@ type WriteFileInfo struct {
}

// NewBWHandler creates the bufferedWriteHandler struct.
func NewBWHandler(blockSize int64, maxBlocks int64, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) {
func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBlocks int64, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) {
bp, err := block.NewBlockPool(blockSize, maxBlocks, globalMaxBlocksSem)
if err != nil {
return
}

bwh = &BufferedWriteHandler{
current: nil,
blockPool: bp,
totalSize: 0,
mtime: time.Now(),
current: nil,
blockPool: bp,
uploadHandler: newUploadHandler(objectName, bucket, bp.FreeBlocksChannel(), blockSize),
totalSize: 0,
mtime: time.Now(),
}
return
}
Expand Down Expand Up @@ -88,7 +91,7 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
dataWritten += bytesToCopy

if wh.current.Size() == wh.blockPool.BlockSize() {
// TODO: err = trigger upload
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return err
}
Expand All @@ -108,8 +111,14 @@ func (wh *BufferedWriteHandler) Sync() (err error) {

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (err error) {
// TODO: Will be added after uploadHandler changes are done.
return fmt.Errorf("not implemented")
if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return err
}
wh.current = nil
}
return wh.uploadHandler.Finalize()
}

// SetMtime stores the mtime with the bufferedWriteHandler.
Expand Down
33 changes: 32 additions & 1 deletion internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -35,13 +38,15 @@ func TestBufferedWriteTestSuite(t *testing.T) {
}

func (testSuite *BufferedWriteTest) SetupTest() {
bwh, err := NewBWHandler(1024, 10, semaphore.NewWeighted(10))
bucket := fake.NewFakeBucket(timeutil.RealClock(), "FakeBucketName", gcs.NonHierarchical)
bwh, err := NewBWHandler("testObject", bucket, 1024, 10, semaphore.NewWeighted(10))
require.Nil(testSuite.T(), err)
testSuite.bwh = bwh
}

func (testSuite *BufferedWriteTest) TestSetMTime() {
testTime := time.Now()

testSuite.bwh.SetMtime(testTime)

assert.Equal(testSuite.T(), testTime, testSuite.bwh.WriteFileInfo().Mtime)
Expand Down Expand Up @@ -69,6 +74,7 @@ func (testSuite *BufferedWriteTest) TestWriteWithEmptyBuffer() {
func (testSuite *BufferedWriteTest) TestWriteEqualToBlockSize() {
size := 1024
data := strings.Repeat("A", size)

err := testSuite.bwh.Write([]byte(data), 0)

require.Nil(testSuite.T(), err)
Expand All @@ -80,10 +86,35 @@ func (testSuite *BufferedWriteTest) TestWriteEqualToBlockSize() {
func (testSuite *BufferedWriteTest) TestWriteDataSizeGreaterThanBlockSize() {
size := 2000
data := strings.Repeat("A", size)

err := testSuite.bwh.Write([]byte(data), 0)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
assert.Equal(testSuite.T(), testSuite.bwh.mtime, fileInfo.Mtime)
assert.Equal(testSuite.T(), int64(size), fileInfo.TotalSize)
}

func (testSuite *BufferedWriteTest) TestFlushWithNonNilCurrentBlock() {
err := testSuite.bwh.Write([]byte("hi"), 0)
currentBlock := testSuite.bwh.current
require.Nil(testSuite.T(), err)

err = testSuite.bwh.Flush()

require.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), nil, testSuite.bwh.current)
// The current block should be available on the free channel as flush triggers
// an upload before finalize.
freeCh := testSuite.bwh.blockPool.FreeBlocksChannel()
got := <-freeCh
assert.Equal(testSuite.T(), &currentBlock, &got)
}

func (testSuite *BufferedWriteTest) TestFlushWithNilCurrentBlock() {
require.Nil(testSuite.T(), testSuite.bwh.current)

err := testSuite.bwh.Flush()

assert.NoError(testSuite.T(), err)
}
2 changes: 1 addition & 1 deletion internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (f *FileInode) CreateEmptyTempFile() (err error) {
func (f *FileInode) writeToBuffer(data []byte, offset int64) (err error) {
// Initialize bufferedWriteHandler if not done already.
if f.bwh == nil {
f.bwh, err = bufferedwrites.NewBWHandler(f.writeConfig.BlockSizeMb, f.writeConfig.MaxBlocksPerFile, f.globalMaxBlocksSem)
f.bwh, err = bufferedwrites.NewBWHandler(f.name.GcsObjectName(), f.bucket, f.writeConfig.BlockSizeMb, f.writeConfig.MaxBlocksPerFile, f.globalMaxBlocksSem)
if err != nil {
return fmt.Errorf("failed to create bufferedWriteHandler: %w", err)
}
Expand Down

0 comments on commit a55f174

Please sign in to comment.