Skip to content

Commit

Permalink
Added block interface and memory block implementations. (#2600)
Browse files Browse the repository at this point in the history
* Added block interface and memory block implementations.

* Fixing lint
  • Loading branch information
vadlakondaswetha authored Oct 17, 2024
1 parent 0a0cfa2 commit 21633e8
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 0 deletions.
78 changes: 78 additions & 0 deletions internal/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package block

import (
"bytes"
"fmt"
"io"
)

// Block represents the buffer which holds the data.
type Block interface {
// Reuse resets the blocks for reuse.
Reuse()

// Size provides the current data size of the block. The capacity of the block
// can be >= data_size.
Size() int64

// Write writes the given data to block.
Write(bytes []byte) error

// Reader interface helps in copying the data directly to storage.writer
// while uploading to GCS.
Reader() io.Reader
}

// TODO: check if we need offset or just storing end is sufficient. We might need
// for handling ordered writes. It will be decided after ordered writes design.
type offset struct {
start, end int64
}

type memoryBlock struct {
Block
buffer []byte
offset offset
}

func (m *memoryBlock) Reuse() {
clear(m.buffer)

m.offset.end = 0
m.offset.start = 0
}

func (m *memoryBlock) Size() int64 {
return m.offset.end - m.offset.start
}
func (m *memoryBlock) Write(bytes []byte) error {
if m.Size()+int64(len(bytes)) > int64(cap(m.buffer)) {
return fmt.Errorf("received data more than capacity of the block")
}

n := copy(m.buffer[m.offset.end:], bytes)
if n != len(bytes) {
return fmt.Errorf("error in copying the data to block. Expected %d, got %d", len(bytes), n)
}

m.offset.end += int64(len(bytes))
return nil
}

func (m *memoryBlock) Reader() io.Reader {
return bytes.NewReader(m.buffer[0:m.offset.end])
}
122 changes: 122 additions & 0 deletions internal/block/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package block

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

const outOfCapacityError string = "received data more than capacity of the block"

type MemoryBlockTest struct {
suite.Suite
}

func TestMemoryBlockTestSuite(t *testing.T) {
suite.Run(t, new(MemoryBlockTest))
}

func createBlock(size uint32) Block {
mb := memoryBlock{
buffer: make([]byte, size),
offset: offset{0, 0},
}

return &mb
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWrite() {
mb := createBlock(12)
content := []byte("hi")
err := mb.Write(content)

assert.Nil(testSuite.T(), err)
output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), content, output)
assert.Equal(testSuite.T(), int64(2), mb.Size())
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithDataGreaterThanCapacity() {
mb := createBlock(1)
content := []byte("hi")
err := mb.Write(content)

assert.NotNil(testSuite.T(), err)
assert.EqualError(testSuite.T(), err, outOfCapacityError)
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithMultipleWrites() {
mb := createBlock(12)
err := mb.Write([]byte("hi"))
assert.Nil(testSuite.T(), err)
err = mb.Write([]byte("hello"))
assert.Nil(testSuite.T(), err)

output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []byte("hihello"), output)
assert.Equal(testSuite.T(), int64(7), mb.Size())
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWith2ndWriteBeyondCapacity() {
mb := createBlock(2)
content := []byte("hi")
err := mb.Write(content)
assert.Nil(testSuite.T(), err)
err = mb.Write(content)

assert.NotNil(testSuite.T(), err)
assert.EqualError(testSuite.T(), err, outOfCapacityError)
}

func (testSuite *MemoryBlockTest) TestMemoryBlockReuse() {
mb := createBlock(12)
content := []byte("hi")
err := mb.Write(content)
assert.Nil(testSuite.T(), err)
output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), content, output)
assert.Equal(testSuite.T(), int64(2), mb.Size())

mb.Reuse()

output, err = io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Empty(testSuite.T(), output)
assert.Equal(testSuite.T(), int64(0), mb.Size())
}

// Other cases for Size are covered as part of write tests.
func (testSuite *MemoryBlockTest) TestMemoryBlockSizeForEmptyBlock() {
mb := createBlock(12)

assert.Equal(testSuite.T(), int64(0), mb.Size())
}

// Other cases for reader are covered as part of write tests.
func (testSuite *MemoryBlockTest) TestMemoryBlockReaderForEmptyBlock() {
mb := createBlock(12)

output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Empty(testSuite.T(), output)
assert.Equal(testSuite.T(), int64(0), mb.Size())
}

0 comments on commit 21633e8

Please sign in to comment.