Skip to content

Commit

Permalink
Relaxing API to not always require a range matrix.
Browse files Browse the repository at this point in the history
.
  • Loading branch information
liaud committed Jan 9, 2025
1 parent 4a669dd commit 9fd1b19
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 98 deletions.
90 changes: 32 additions & 58 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ char* linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, uint6
// the buffer
int previous_idx = -1;
for (i = 0; i < inlen; i++) {
int index;
int index = get_fragment_idx(in[i]);
if (orig_data_size < 0) {
orig_data_size = get_orig_data_size(in[i]);
Expand Down Expand Up @@ -820,19 +820,18 @@ func (backend *Backend) ValidateFragmentMatrix(frag RawFragment, pieceSize int)
return true
}

func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, rangeM *RangeMatrix) (*DecodeData, error) {
func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int) (*DecodeData, error) {
var wg sync.WaitGroup

if len(frags) == 0 {
return nil, errors.New("linearizing requires at least one fragment")
}

fragRangeLen := rangeM.InFragRangeEndExcl - rangeM.InFragRangeStartIncl
piecesize := rangeM.PieceSize
lenBlock := piecesize + backend.headerSize
numBlock := fragRangeLen / lenBlock
if numBlock*lenBlock != fragRangeLen {
numBlock++
fragRangeLen := len(frags[0])
chunkSize := pieceSize + backend.headerSize
nrChunks := fragRangeLen / chunkSize
if nrChunks*chunkSize != fragRangeLen {
nrChunks++
}

/* Fragments are sorted beforehand with the index of the first chunk.
Expand Down Expand Up @@ -884,28 +883,28 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, rangeM *Range
}
fragsIndex = fragsIndex[:lastDataFragIdxExcl]

dataB, data := backend.pool.New(numBlock * piecesize * len(fragsIndex))
dataB, data := backend.pool.New(nrChunks * pieceSize * len(fragsIndex))
errorNb := uint32(0)
totLen := uint64(0)
wg.Add(numBlock)
wg.Add(nrChunks)

for i := 0; i < numBlock; i++ {
for i := 0; i < nrChunks; i++ {
// launch goroutines, providing them a subrange of the final buffer so it can be used
// in concurrency without need to lock it access
go func(chunkIdx int) {
func(chunkIdx int) {
cDataFrags := C.makeStrArray(C.int(len(fragsIndex)))
// prepare the C array of pointer, respecting the offset in each fragments

for i, idx := range fragsIndex {
frag := frags[idx]
cSetArrayItem(cDataFrags, i, (*C.char)(unsafe.Pointer(&frag[chunkIdx*lenBlock])))
cSetArrayItem(cDataFrags, i, (*C.char)(unsafe.Pointer(&frag[chunkIdx*chunkSize])))
}
// try to decode fastly (if we have all data fragments), providing the good offset of the
// linearized buffer, according the block number we are decoding
var outlen C.uint64_t
p := C.linearize(C.int(backend.K), cDataFrags, C.int(len(fragsIndex)),
(*C.char)(unsafe.Pointer(&data[chunkIdx*piecesize*len(fragsIndex)])),
C.uint64_t(piecesize*backend.K), &outlen)
(*C.char)(unsafe.Pointer(&data[chunkIdx*pieceSize*len(fragsIndex)])),
C.uint64_t(pieceSize*backend.K), &outlen)

if p == nil {
atomic.AddUint32(&errorNb, 1)
Expand All @@ -925,29 +924,23 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, rangeM *Range
return nil, errors.New("failed to linearize fragments")
}

reqLen := rangeM.ReqEndIncl - rangeM.ReqStartIncl + 1
if uint64(reqLen) > totLen || uint64(rangeM.DecodedRangeStartIncl) >= totLen {
return nil, errors.New("out of bounds range matrix")
}
linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + reqLen

return &DecodeData{
data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl:linearizedRangeEndExcl],
data[:totLen:totLen],
func() {
backend.pool.Release(dataB)
}}, nil
}

// DecodeMatrix tries to reconstruct the data fragments and returns the linearized data.
func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, rangeM *RangeMatrix) (*DecodeData, error) {
fragRangeLen := rangeM.InFragRangeEndExcl - rangeM.InFragRangeStartIncl
chunkSize := rangeM.PieceSize + backend.headerSize
func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) (*DecodeData, error) {
fragRangeLen := len(frags[0])
chunkSize := pieceSize + backend.headerSize
chunkNr := fragRangeLen / chunkSize
if chunkNr*chunkSize != fragRangeLen {
chunkNr++
}

dataB, data := backend.pool.New(chunkNr * rangeM.PieceSize * backend.K)
dataB, data := backend.pool.New(chunkNr * pieceSize * backend.K)

var totLen int64

Expand All @@ -969,13 +962,7 @@ func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, rangeM *RangeMat
subdata.Free()
}

reqLen := rangeM.ReqEndIncl - rangeM.ReqStartIncl + 1
if int64(reqLen) > totLen || int64(rangeM.DecodedRangeStartIncl) >= totLen {
return nil, errors.New("out of bounds range matrix")
}
decodedRangeEndExcl := rangeM.DecodedRangeStartIncl + reqLen

return &DecodeData{data[rangeM.DecodedRangeStartIncl:decodedRangeEndExcl:decodedRangeEndExcl], func() {
return &DecodeData{data[:totLen:totLen], func() {
backend.pool.Release(dataB)
}}, nil
}
Expand All @@ -985,16 +972,7 @@ type RangeMatrix struct {
ReqStartIncl int
ReqEndIncl int

PieceSize int
FragSize int

/* The first and last fragment that the range spans.
Being interleaved, it is possible that FragStartIncl > FragEndIncl
when the range wraps around fragments.
It doesn't include coding fragments as any repair will requires at
least K fragments. */
/* The fragments that the range spans. */
FragFirstIncl int
FragCount int

Expand Down Expand Up @@ -1032,7 +1010,7 @@ type RangeMatrix struct {
* ranges by '[]':
*
* p1 [-[- *]-] The request here start at the end of the 2nd chunk in p4
* [-[- *]-] then wraps around in the subsequent chunks in p1 and p2.
* [-[- *]-] then wraps around in the subsequent chunks in p1 and p2.
* .. [-[- -]-]
* p4 [-[* -]-] When this occurs, we will still query, p4 just to decode
* the relevant requested range. The decoded buffer
Expand Down Expand Up @@ -1066,15 +1044,20 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize i
pieceStartIncl := startIncl / pieceSize
pieceEndIncl := endIncl / pieceSize

groupStartIncl := pieceStartIncl / backend.K
groupEndIncl := pieceEndIncl / backend.K

fragFirstIncl := pieceStartIncl % backend.K
fragCount := (pieceEndIncl + 1 - pieceStartIncl)
if fragCount > backend.K {
dataOffset := pieceStartIncl * pieceSize

/* When wrapping around, we read the full groups. */
if fragFirstIncl+fragCount > backend.K {
fragFirstIncl = 0
fragCount = backend.K
dataOffset = groupStartIncl * groupSize
}

groupStartIncl := pieceStartIncl / backend.K
groupEndIncl := pieceEndIncl / backend.K

/* For each fragment, this is the minimum range to read -- including
the header -- to decode or repair the data. */
inFragRangeStartIncl := groupStartIncl * chunkSize
Expand All @@ -1087,23 +1070,14 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize i
Special care is needed whe the requested range wraps in the
fragments. In that case we degenerate to querying all groups of
all fragments (see (2) in the function's comment above). */
var linearizedRangeStartIncl int
if fragFirstIncl+fragCount > backend.K {
fragFirstIncl = 0
fragCount = backend.K
linearizedRangeStartIncl = startIncl - groupStartIncl*groupSize
} else {
linearizedRangeStartIncl = startIncl - pieceStartIncl*pieceSize
}
linearizedRangeStartIncl := startIncl - dataOffset

/* Decoding always works on a group boundary. */
decodedRangeStartIncl := startIncl - groupStartIncl*groupSize

return &RangeMatrix{
ReqStartIncl: startIncl,
ReqEndIncl: endIncl,
PieceSize: pieceSize,
FragSize: fragSize,
FragFirstIncl: fragFirstIncl,
FragCount: fragCount,
InFragRangeStartIncl: inFragRangeStartIncl,
Expand Down
63 changes: 25 additions & 38 deletions backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,7 @@ func BenchmarkLinearizeM(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
rangeM := backend.GetRangeMatrix(0, test.size-1, DefaultChunkSize, len(encoded.Data[0]))
if rangeM == nil {
b.Fatal("could not generate range matrix")
}

decoded, err := backend.LinearizeMatrix(encoded.Data, rangeM)
decoded, err := backend.LinearizeMatrix(encoded.Data, DefaultChunkSize)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -622,12 +617,7 @@ func BenchmarkDecodeM(b *testing.B) {
data := encoded.Data[1:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
rangeM := backend.GetRangeMatrix(0, test.size-1, DefaultChunkSize, len(encoded.Data[0]))
if rangeM == nil {
b.Fatal("could not generate range matrix")
}

decoded, err := backend.DecodeMatrix(data, rangeM)
decoded, err := backend.DecodeMatrix(data, DefaultChunkSize)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -727,13 +717,9 @@ func BenchmarkMatrix(b *testing.B) {
encoded, _ := backend.EncodeMatrix(buf, blockSize)
defer encoded.Free()

rangeM := backend.GetRangeMatrix(0, dtest.size-1, blockSize, len(encoded.Data[0]))
if rangeM == nil {
b.Fatal("could not generate range matrix")
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
decoded, err := backend.LinearizeMatrix(encoded.Data, rangeM)
decoded, err := backend.LinearizeMatrix(encoded.Data, blockSize)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -811,10 +797,7 @@ func TestEncodeM(t *testing.T) {

// Check that our linearized buffer
// contains expected data when there is all data fragment.
rangeM := backend.GetRangeMatrix(0, len(buf)-1, p.chunkUnit, len(result.Data[0]))
assert.NotNil(t, rangeM)

ddata, err := backend.LinearizeMatrix(result.Data, rangeM)
ddata, err := backend.LinearizeMatrix(result.Data, p.chunkUnit)
assert.NoError(t, err)
assert.Equal(t, len(buf), len(ddata.Data), "data mismatch")
assert.Equalf(t, buf, ddata.Data, "data mismatch")
Expand All @@ -829,7 +812,7 @@ func TestEncodeM(t *testing.T) {
vect = append(vect, result.Data[4])
vect = append(vect, result.Data[5])

ddata2, _ := backend.DecodeMatrix(vect, rangeM)
ddata2, _ := backend.DecodeMatrix(vect, p.chunkUnit)
assert.Equal(t, buf, ddata2.Data, "data mismatch")
ddata2.Free()

Expand Down Expand Up @@ -882,24 +865,27 @@ func TestLinearizeMatrix(t *testing.T) {
defer encoded.Free()

fragSize := len(encoded.Data[0])
rangeMatrix := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize)
assert.NotNil(rangeMatrix)
rangeM := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize)
assert.NotNil(rangeM)

/* Decode the matrix as if it was requested and
checks that the result matches the payload on the requested range. */
frags := make([][]byte, 0)
for i := 0; i < rangeMatrix.FragCount; i += 1 {
fragIdx := (rangeMatrix.FragFirstIncl + i) % k
buffer := encoded.Data[fragIdx][rangeMatrix.InFragRangeStartIncl:rangeMatrix.InFragRangeEndExcl]
for i := 0; i < rangeM.FragCount; i += 1 {
fragIdx := (rangeM.FragFirstIncl + i) % k
buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl]
frags = append(frags, buffer)
}

decoded, err := backend.LinearizeMatrix(frags, rangeMatrix)
decoded, err := backend.LinearizeMatrix(frags, pieceSize)
assert.Nil(err)
defer decoded.Free()

expected := data[startIncl : endIncl+1]
return bytes.Equal(expected, decoded.Data)

linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + 1
found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl]
return bytes.Equal(expected, found)
}

config := quick.Config{
Expand Down Expand Up @@ -955,8 +941,8 @@ func TestDecodeMatrix(t *testing.T) {
defer encoded.Free()

fragSize := len(encoded.Data[0])
rangeMatrix := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize)
assert.NotNil(rangeMatrix)
rangeM := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize)
assert.NotNil(rangeM)

/* Decode the matrix as if it was requested and
checks that the result matches the payload on the requested range. */
Expand All @@ -967,16 +953,19 @@ func TestDecodeMatrix(t *testing.T) {
continue
}

buffer := encoded.Data[fragIdx][rangeMatrix.InFragRangeStartIncl:rangeMatrix.InFragRangeEndExcl]
buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl]
frags = append(frags, buffer)
}

decoded, err := backend.DecodeMatrix(frags, rangeMatrix)
decoded, err := backend.DecodeMatrix(frags, pieceSize)
assert.Nil(err)
defer decoded.Free()

expected := data[startIncl : endIncl+1]
return bytes.Equal(expected, decoded.Data)

decodedRangeEndExcl := rangeM.DecodedRangeStartIncl + (endIncl - startIncl) + 1
found := decoded.Data[rangeM.DecodedRangeStartIncl:decodedRangeEndExcl]
return bytes.Equal(expected, found)
}

config := quick.Config{
Expand Down Expand Up @@ -1011,7 +1000,7 @@ func TestValidateFragmentMatrix(t *testing.T) {
defer encoded.Free()

fragSize := len(encoded.Data[0])
for i := 0; i < 1; i += 1 {
for i := 0; i < len(encoded.Data); i += 1 {
rangeMatrix := backend.GetRangeMatrix(0, dataSize-1, pieceSize, fragSize)
assert.NotNil(rangeMatrix)

Expand Down Expand Up @@ -1138,9 +1127,7 @@ func TestEncodeDecodeMatrix(t *testing.T) {

frags := data.Data
decode := func(frags [][]byte, description string) bool {
rangeM := backend.GetRangeMatrix(0, len(pattern)-1, 32768, len(data.Data[0]))

decoded, err := backend.DecodeMatrix(frags, rangeM)
decoded, err := backend.DecodeMatrix(frags, 32768)
if err != nil {
t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex)
return false
Expand Down
3 changes: 1 addition & 2 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ func TestEncodeBufferMatrix(t *testing.T) {
assert.Equal(t, b.FragLen(), len(frags[i]))
}

rangeM := backend.GetRangeMatrix(0, size-1, blockSize, len(frags[0]))
decoded, err := backend.DecodeMatrix(frags, rangeM)
decoded, err := backend.DecodeMatrix(frags, blockSize)
assert.NoError(t, err)
if err != nil {
t.FailNow()
Expand Down

0 comments on commit 9fd1b19

Please sign in to comment.