From 8b4059867738d0e3f3da9a3c82bb5cd27b0a355c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 15 Oct 2024 16:59:03 -0700 Subject: [PATCH] remove chunk data pack --- .../cmd/rollback_executed_height.go | 56 ++++++++++++++++--- storage/pebble/chunk_data_packs.go | 4 ++ 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go index 33f6622cc1b..1958630cba5 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/cockroachdb/pebble" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -13,11 +14,13 @@ import ( "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger" + storagepebble "github.com/onflow/flow-go/storage/pebble" ) var ( - flagHeight uint64 - flagDataDir string + flagHeight uint64 + flagDataDir string + flagChunkDataPackDir string ) var Cmd = &cobra.Command{ @@ -36,6 +39,10 @@ func init() { Cmd.Flags().StringVar(&flagDataDir, "datadir", "", "directory that stores the protocol state") _ = Cmd.MarkFlagRequired("datadir") + + Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk-data-pack-dir", "", + "directory that stores the chunk data pack") + _ = Cmd.MarkFlagRequired("chunk-data-pack-dir") } func run(*cobra.Command, []string) { @@ -51,16 +58,33 @@ func run(*cobra.Command, []string) { } db := common.InitStorage(flagDataDir) + defer func() { + err := db.Close() + if err != nil { + log.Fatal().Err(err).Msg("could not close db") + } + }() + storages := common.InitStorages(db) state, err := common.InitProtocolState(db, storages) if err != nil { log.Fatal().Err(err).Msg("could not init protocol states") } + chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(flagChunkDataPackDir) + if err != nil { + log.Fatal().Err(err).Msg("could not open chunk data pack db") + } + defer func() { + err := chunkDataPackDB.Close() + if err != nil { + log.Fatal().Err(err).Msg("could not close chunk data pack db") + } + }() + metrics := &metrics.NoopCollector{} transactionResults := badger.NewTransactionResults(metrics, db, badger.DefaultCacheSize) commits := badger.NewCommits(metrics, db) - chunkDataPacks := badger.NewChunkDataPacks(metrics, db, badger.NewCollections(db, badger.NewTransactions(metrics, db)), badger.DefaultCacheSize) results := badger.NewExecutionResults(metrics, db) receipts := badger.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize) myReceipts := badger.NewMyExecutionReceipts(metrics, db, receipts) @@ -70,8 +94,13 @@ func run(*cobra.Command, []string) { writeBatch := badger.NewBatch(db) + chunkDataPacks := storagepebble.NewChunkDataPacks(metrics, + chunkDataPackDB, storages.Collections, 1000) + pebbleWriter := chunkDataPackDB.NewBatch() + err = removeExecutionResultsFromHeight( writeBatch, + pebbleWriter, state, headers, transactionResults, @@ -86,11 +115,20 @@ func run(*cobra.Command, []string) { if err != nil { log.Fatal().Err(err).Msgf("could not remove result from height %v", flagHeight) } + + log.Info().Msgf("flushing badger write batch at %v", flagHeight) err = writeBatch.Flush() if err != nil { log.Fatal().Err(err).Msgf("could not flush write batch at %v", flagHeight) } + log.Info().Msgf("flushing pebble writer at %v", flagHeight) + + err = pebbleWriter.Commit(pebble.Sync) + if err != nil { + log.Fatal().Err(err).Msgf("could not flush pebble writer at %v", flagHeight) + } + header, err := state.AtHeight(flagHeight).Head() if err != nil { log.Fatal().Err(err).Msgf("could not get block header at height %v", flagHeight) @@ -109,11 +147,12 @@ func run(*cobra.Command, []string) { // need to include the Remove methods func removeExecutionResultsFromHeight( writeBatch *badger.Batch, + pebbleWriter pebble.Writer, protoState protocol.State, headers *badger.Headers, transactionResults *badger.TransactionResults, commits *badger.Commits, - chunkDataPacks *badger.ChunkDataPacks, + chunkDataPacks *storagepebble.ChunkDataPacks, results *badger.ExecutionResults, myReceipts *badger.MyExecutionReceipts, events *badger.Events, @@ -148,7 +187,7 @@ func removeExecutionResultsFromHeight( blockID := head.ID() - err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) + err = removeForBlockID(writeBatch, pebbleWriter, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) if err != nil { return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err) } @@ -167,7 +206,7 @@ func removeExecutionResultsFromHeight( total = len(pendings) for _, pending := range pendings { - err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) + err = removeForBlockID(writeBatch, pebbleWriter, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) if err != nil { return fmt.Errorf("could not remove result for pending block %v: %w", pending, err) @@ -188,11 +227,12 @@ func removeExecutionResultsFromHeight( // It bubbles up any error encountered func removeForBlockID( writeBatch *badger.Batch, + pebbleWriter pebble.Writer, headers *badger.Headers, commits *badger.Commits, transactionResults *badger.TransactionResults, results *badger.ExecutionResults, - chunks *badger.ChunkDataPacks, + chunks *storagepebble.ChunkDataPacks, myReceipts *badger.MyExecutionReceipts, events *badger.Events, serviceEvents *badger.ServiceEvents, @@ -211,7 +251,7 @@ func removeForBlockID( for _, chunk := range result.Chunks { chunkID := chunk.ID() // remove chunk data pack - err := chunks.BatchRemove(chunkID, writeBatch) + err := chunks.PebbleBatchRemove(chunkID, pebbleWriter) if errors.Is(err, storage.ErrNotFound) { log.Warn().Msgf("chunk %v not found for block %v", chunkID, blockID) continue diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index c0b5b47eeab..f1bedae2f1d 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -126,6 +126,10 @@ func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat return fmt.Errorf("not implemented") } +func (ch *ChunkDataPacks) PebbleBatchRemove(chunkID flow.Identifier, batch pebble.Writer) error { + return operation.RemoveChunkDataPack(chunkID)(batch) +} + func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error { return operation.RemoveChunkDataPack(chunkID)(batch) }