Skip to content

Commit

Permalink
Blob feed iteration with dynamo pagination (#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Feb 10, 2025
1 parent a86da1e commit 6382132
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 24 deletions.
86 changes: 63 additions & 23 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,35 +290,61 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// queryBucketBlobMetadata queries a single bucket for blob metadata within the given key range.
// queryBucketBlobMetadata returns blobs (as metadata) within range [startKey, endKey] from a single bucket.
// Results are ordered by <RequestedAt, Bobkey> in ascending order.
//
// The function handles DynamoDB's 1MB response size limitation by performing multiple queries if necessary.
func (s *BlobMetadataStore) queryBucketBlobMetadata(
ctx context.Context,
bucket uint64,
startKey string,
endKey string,
) ([]*v2.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(
ctx,
s.tableName,
RequestedAtIndexName,
"RequestedAtBucket = :pk AND RequestedAtBlobKey BETWEEN :start AND :end",
commondynamodb.ExpressionValues{
":pk": &types.AttributeValueMemberS{Value: fmt.Sprintf("%d", bucket)},
":start": &types.AttributeValueMemberS{Value: startKey},
":end": &types.AttributeValueMemberS{Value: endKey},
},
)
if err != nil {
return nil, fmt.Errorf("query failed for bucket %d: %w", bucket, err)
}
metadata := make([]*v2.BlobMetadata, 0)
var lastEvaledKey map[string]types.AttributeValue

metadata := make([]*v2.BlobMetadata, 0, len(items))
for _, item := range items {
bm, err := UnmarshalBlobMetadata(item)
for {
start := startKey
if lastEvaledKey != nil {
requestedAtBlobkey, err := UnmarshalRequestedAtBlobKey(lastEvaledKey)
if err != nil {
return nil, fmt.Errorf("failed to parse the RequestedAtBlobkey from the LastEvaluatedKey: %w", err)
}
start = requestedAtBlobkey
}

res, err := s.dynamoDBClient.QueryIndexWithPagination(
ctx,
s.tableName,
RequestedAtIndexName,
"RequestedAtBucket = :pk AND RequestedAtBlobKey BETWEEN :start AND :end",
commondynamodb.ExpressionValues{
":pk": &types.AttributeValueMemberS{Value: fmt.Sprintf("%d", bucket)},
":start": &types.AttributeValueMemberS{Value: start},
":end": &types.AttributeValueMemberS{Value: endKey},
},
0, // no limit within a bucket
lastEvaledKey,
)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal blob metadata: %w", err)
return nil, fmt.Errorf("query failed for bucket %d: %w", bucket, err)
}
metadata = append(metadata, bm)

// Collect results
for _, item := range res.Items {
bm, err := UnmarshalBlobMetadata(item)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal blob metadata: %w", err)
}
metadata = append(metadata, bm)
}

// Exhausted all items already
if res.LastEvaluatedKey == nil {
break
}
// For next iteration
lastEvaledKey = res.LastEvaluatedKey
}

return metadata, nil
Expand All @@ -339,7 +365,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAt(
return nil, nil, errors.New("start cursor is expected to be less than end cursor")
}

startBucket, endBucket := getRequestedAtBucketIDRange(start.RequestedAt, end.RequestedAt)
startBucket, endBucket := GetRequestedAtBucketIDRange(start.RequestedAt, end.RequestedAt)
startKey := start.ToCursorKey()
endKey := end.ToCursorKey()

Expand Down Expand Up @@ -1357,6 +1383,20 @@ func UnmarshalBatchHeaderHash(item commondynamodb.Item) ([32]byte, error) {
return hexToHash(root)
}

func UnmarshalRequestedAtBlobKey(item commondynamodb.Item) (string, error) {
type Object struct {
RequestedAtBlobKey string
}

obj := Object{}
err := attributevalue.UnmarshalMap(item, &obj)
if err != nil {
return "", err
}

return obj.RequestedAtBlobKey, nil
}

func UnmarshalAttestedAt(item commondynamodb.Item) (uint64, error) {
type Object struct {
AttestedAt uint64
Expand Down Expand Up @@ -1570,9 +1610,9 @@ func computeAttestedAtBucket(attestedAt uint64) string {
return fmt.Sprintf("%d", id)
}

// getRequestedAtBucketIDRange returns the adjusted start and end bucket IDs based on
// GetRequestedAtBucketIDRange returns the adjusted start and end bucket IDs based on
// the allowed time range for blobs.
func getRequestedAtBucketIDRange(startTime, endTime uint64) (uint64, uint64) {
func GetRequestedAtBucketIDRange(startTime, endTime uint64) (uint64, uint64) {
now := uint64(time.Now().UnixNano())
oldestAllowed := now - maxBlobAgeInNano

Expand Down
65 changes: 64 additions & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,69 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtWithIdenticalTimestamp(t *
}
}

func TestBlobMetadataStoreGetBlobMetadataByRequestedAtWithDynamoPagination(t *testing.T) {
ctx := context.Background()

// Make all blobs happen in 120s
numBlobs := 1200
nanoSecsPerBlob := uint64(1e8) // 10 blob per second

now := uint64(time.Now().UnixNano())
firstBlobTime := now - uint64(10*time.Minute.Nanoseconds())
// Adjust "now" so all blobs will deterministically fall in just one
// bucket.
startBucket, endBucket := blobstore.GetRequestedAtBucketIDRange(firstBlobTime-1, now)
if startBucket < endBucket {
now -= uint64(11 * time.Minute.Nanoseconds())
firstBlobTime = now - uint64(10*time.Minute.Nanoseconds())
}
startBucket, endBucket = blobstore.GetAttestedAtBucketIDRange(firstBlobTime-1, now)
require.Equal(t, startBucket, endBucket)

// Create blobs for testing
// The num of blobs here are large enough to make it more than 1MB (the max response
// size of DyanamoDB) so it will have to use DynamoDB's pagination to get all desired
// results.
keys := make([]corev2.BlobKey, numBlobs)
dynamoKeys := make([]commondynamodb.Key, numBlobs)
for i := 0; i < numBlobs; i++ {
blobKey, blobHeader := newBlob(t)
now := time.Now()
metadata := &v2.BlobMetadata{
BlobHeader: blobHeader,
Signature: []byte{1, 2, 3},
BlobStatus: v2.Encoded,
Expiry: uint64(now.Add(time.Hour).Unix()),
NumRetries: 0,
UpdatedAt: uint64(now.UnixNano()),
RequestedAt: firstBlobTime + nanoSecsPerBlob*uint64(i),
}
err := blobMetadataStore.PutBlobMetadata(ctx, metadata)
require.NoError(t, err)
keys[i] = blobKey
dynamoKeys[i] = commondynamodb.Key{
"PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()},
"SK": &types.AttributeValueMemberS{Value: "BlobMetadata"},
}
}
defer deleteItems(t, dynamoKeys)

startCursor := blobstore.BlobFeedCursor{
RequestedAt: firstBlobTime,
BlobKey: nil,
}
endCursor := blobstore.BlobFeedCursor{
RequestedAt: now + 1,
BlobKey: nil,
}
blobs, lastProcessedCursor, err := blobMetadataStore.GetBlobMetadataByRequestedAt(ctx, startCursor, endCursor, 0)
require.NoError(t, err)
require.Equal(t, numBlobs, len(blobs))
require.NotNil(t, lastProcessedCursor)
assert.Equal(t, firstBlobTime+nanoSecsPerBlob*uint64(numBlobs-1), lastProcessedCursor.RequestedAt)
assert.Equal(t, keys[numBlobs-1], *lastProcessedCursor.BlobKey)
}

func TestBlobMetadataStoreGetBlobMetadataByRequestedAt(t *testing.T) {
ctx := context.Background()
numBlobs := 103
Expand Down Expand Up @@ -661,7 +724,7 @@ func TestBlobMetadataStoreGetAttestationByAttestedAt(t *testing.T) {
})
}

func TestBlobMetadataStoreGetAttestationByAttestedAtPagination(t *testing.T) {
func TestBlobMetadataStoreGetAttestationByAttestedAtWithDynamoPagination(t *testing.T) {
ctx := context.Background()

now := uint64(time.Now().UnixNano())
Expand Down

0 comments on commit 6382132

Please sign in to comment.