From 9e5913c020117cefd315ec49f03d98aecc50a78b Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Thu, 6 Jun 2024 16:57:31 +0200 Subject: [PATCH 01/19] Change metrics collection in computer --- .../computation/computer/computer_test.go | 23 +++---- .../computation/computer/result_collector.go | 69 +++++++++++-------- module/metrics.go | 45 ++++++++---- module/metrics/example/execution/main.go | 18 +++-- module/metrics/execution.go | 25 +++---- module/metrics/noop.go | 61 ++++++++-------- module/mock/execution_metrics.go | 10 +-- 7 files changed, 140 insertions(+), 111 deletions(-) diff --git a/engine/execution/computation/computer/computer_test.go b/engine/execution/computation/computer/computer_test.go index 0f43e1df54d..419186588ff 100644 --- a/engine/execution/computation/computer/computer_test.go +++ b/engine/execution/computation/computer/computer_test.go @@ -45,6 +45,7 @@ import ( "github.com/onflow/flow-go/ledger/common/pathfinder" "github.com/onflow/flow-go/ledger/complete" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/epochs" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/executiondatasync/provider" @@ -155,13 +156,10 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) { Times(2) // 1 collection + system collection exemetrics.On("ExecutionTransactionExecuted", - mock.Anything, // duration - mock.Anything, // conflict retry count - mock.Anything, // computation used - mock.Anything, // memory used - mock.Anything, // number of events - mock.Anything, // size of events - false). // no failure + mock.Anything, + mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool { + return !arg.Failed // only successful transactions + })). Return(nil). Times(2 + 1) // 2 txs in collection + system chunk tx @@ -1267,12 +1265,11 @@ func Test_ExecutingSystemCollection(t *testing.T) { metrics.On("ExecutionTransactionExecuted", mock.Anything, // duration - mock.Anything, // conflict retry count - mock.Anything, // computation used - mock.Anything, // memory used - expectedNumberOfEvents, - expectedEventSize, - false). + mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool { + return arg.EventCounts == expectedNumberOfEvents && + arg.EventSize == expectedEventSize && + !arg.Failed + })). Return(nil). Times(1) // system chunk tx diff --git a/engine/execution/computation/computer/result_collector.go b/engine/execution/computation/computer/result_collector.go index 88aeb7d378d..33f5d692adf 100644 --- a/engine/execution/computation/computer/result_collector.go +++ b/engine/execution/computation/computer/result_collector.go @@ -78,12 +78,12 @@ type resultCollector struct { spockSignatures []crypto.Signature blockStartTime time.Time - blockStats module.ExecutionResultStats + blockStats module.BlockExecutionResultStats blockMeter *meter.Meter currentCollectionStartTime time.Time currentCollectionState *state.ExecutionState - currentCollectionStats module.ExecutionResultStats + currentCollectionStats module.CollectionExecutionResultStats currentCollectionStorageSnapshot execution.ExtendableStorageSnapshot } @@ -123,9 +123,7 @@ func newResultCollector( blockMeter: meter.NewMeter(meter.DefaultParameters()), currentCollectionStartTime: now, currentCollectionState: state.NewExecutionState(nil, state.DefaultParameters()), - currentCollectionStats: module.ExecutionResultStats{ - NumberOfCollections: 1, - }, + currentCollectionStats: module.CollectionExecutionResultStats{}, currentCollectionStorageSnapshot: storehouse.NewExecutingBlockSnapshot( previousBlockSnapshot, *block.StartState, @@ -201,27 +199,16 @@ func (collector *resultCollector) commitCollection( collector.spockSignatures = append(collector.spockSignatures, spock) - collector.currentCollectionStats.EventCounts = len(events) - collector.currentCollectionStats.EventSize = events.ByteSize() - collector.currentCollectionStats.NumberOfRegistersTouched = len( - collectionExecutionSnapshot.AllRegisterIDs()) - for _, entry := range collectionExecutionSnapshot.UpdatedRegisters() { - collector.currentCollectionStats.NumberOfBytesWrittenToRegisters += len( - entry.Value) - } - collector.metrics.ExecutionCollectionExecuted( time.Since(startTime), collector.currentCollectionStats) - collector.blockStats.Merge(collector.currentCollectionStats) + collector.blockStats.Add(collector.currentCollectionStats) collector.blockMeter.MergeMeter(collectionExecutionSnapshot.Meter) collector.currentCollectionStartTime = time.Now() collector.currentCollectionState = state.NewExecutionState(nil, state.DefaultParameters()) - collector.currentCollectionStats = module.ExecutionResultStats{ - NumberOfCollections: 1, - } + collector.currentCollectionStats = module.CollectionExecutionResultStats{} for _, consumer := range collector.consumers { err = consumer.OnExecutedCollection(collector.result.CollectionExecutionResultAt(collection.collectionIndex)) @@ -269,14 +256,12 @@ func (collector *resultCollector) processTransactionResult( logger.Info().Msg("transaction executed successfully") } - collector.metrics.ExecutionTransactionExecuted( + collector.handleTransactionExecutionMetrics( timeSpent, + output, + txnExecutionSnapshot, + txn, numConflictRetries, - output.ComputationUsed, - output.MemoryEstimate, - len(output.Events), - flow.EventsList(output.Events).ByteSize(), - output.Err != nil, ) txnResult := flow.TransactionResult{ @@ -302,10 +287,6 @@ func (collector *resultCollector) processTransactionResult( return fmt.Errorf("failed to merge into collection view: %w", err) } - collector.currentCollectionStats.ComputationUsed += output.ComputationUsed - collector.currentCollectionStats.MemoryUsed += output.MemoryEstimate - collector.currentCollectionStats.NumberOfTransactions += 1 - if !txn.lastTransactionInCollection { return nil } @@ -316,6 +297,38 @@ func (collector *resultCollector) processTransactionResult( collector.currentCollectionState.Finalize()) } +func (collector *resultCollector) handleTransactionExecutionMetrics( + timeSpent time.Duration, + output fvm.ProcedureOutput, + txnExecutionSnapshot *snapshot.ExecutionSnapshot, + txn TransactionRequest, + numConflictRetries int, +) { + transactionExecutionStats := module.TransactionExecutionResultStats{ + ExecutionResultStats: module.ExecutionResultStats{ + ComputationUsed: output.ComputationUsed, + MemoryUsed: output.MemoryEstimate, + EventCounts: len(output.Events), + EventSize: output.Events.ByteSize(), + NumberOfRegistersTouched: len(txnExecutionSnapshot.AllRegisterIDs()), + }, + ComputationIntensities: output.ComputationIntensities, + NumberOfTxnConflictRetries: numConflictRetries, + Failed: output.Err != nil, + SystemTransaction: txn.isSystemTransaction, + } + for _, entry := range txnExecutionSnapshot.UpdatedRegisters() { + transactionExecutionStats.NumberOfBytesWrittenToRegisters += len(entry.Value) + } + + collector.metrics.ExecutionTransactionExecuted( + timeSpent, + transactionExecutionStats, + ) + + collector.currentCollectionStats.Add(transactionExecutionStats) +} + func (collector *resultCollector) AddTransactionResult( request TransactionRequest, snapshot *snapshot.ExecutionSnapshot, diff --git a/module/metrics.go b/module/metrics.go index f5071222008..98ff736d696 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -8,6 +8,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" httpmetrics "github.com/slok/go-http-metrics/metrics" + "github.com/onflow/flow-go/fvm/meter" "github.com/onflow/flow-go/model/chainsync" "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" @@ -897,8 +898,24 @@ type ExecutionResultStats struct { EventSize int NumberOfRegistersTouched int NumberOfBytesWrittenToRegisters int - NumberOfCollections int - NumberOfTransactions int +} + +type BlockExecutionResultStats struct { + CollectionExecutionResultStats + NumberOfCollections int +} + +type CollectionExecutionResultStats struct { + ExecutionResultStats + NumberOfTransactions int +} + +type TransactionExecutionResultStats struct { + ExecutionResultStats + NumberOfTxnConflictRetries int + Failed bool + SystemTransaction bool + ComputationIntensities meter.MeteredComputationIntensities } func (stats *ExecutionResultStats) Merge(other ExecutionResultStats) { @@ -908,8 +925,17 @@ func (stats *ExecutionResultStats) Merge(other ExecutionResultStats) { stats.EventSize += other.EventSize stats.NumberOfRegistersTouched += other.NumberOfRegistersTouched stats.NumberOfBytesWrittenToRegisters += other.NumberOfBytesWrittenToRegisters - stats.NumberOfCollections += other.NumberOfCollections +} + +func (stats *CollectionExecutionResultStats) Add(other TransactionExecutionResultStats) { + stats.ExecutionResultStats.Merge(other.ExecutionResultStats) + stats.NumberOfTransactions += 1 +} + +func (stats *BlockExecutionResultStats) Add(other CollectionExecutionResultStats) { + stats.CollectionExecutionResultStats.Merge(other.ExecutionResultStats) stats.NumberOfTransactions += other.NumberOfTransactions + stats.NumberOfCollections += 1 } type ExecutionMetrics interface { @@ -936,7 +962,7 @@ type ExecutionMetrics interface { ExecutionLastFinalizedExecutedBlockHeight(height uint64) // ExecutionBlockExecuted reports the total time and computation spent on executing a block - ExecutionBlockExecuted(dur time.Duration, stats ExecutionResultStats) + ExecutionBlockExecuted(dur time.Duration, stats BlockExecutionResultStats) // ExecutionBlockExecutionEffortVectorComponent reports the unweighted effort of given ComputationKind at block level ExecutionBlockExecutionEffortVectorComponent(string, uint) @@ -945,17 +971,10 @@ type ExecutionMetrics interface { ExecutionBlockCachedPrograms(programs int) // ExecutionCollectionExecuted reports the total time and computation spent on executing a collection - ExecutionCollectionExecuted(dur time.Duration, stats ExecutionResultStats) + ExecutionCollectionExecuted(dur time.Duration, stats CollectionExecutionResultStats) // ExecutionTransactionExecuted reports stats on executing a single transaction - ExecutionTransactionExecuted( - dur time.Duration, - numTxnConflictRetries int, - compUsed uint64, - memoryUsed uint64, - eventCounts int, - eventSize int, - failed bool) + ExecutionTransactionExecuted(dur time.Duration, stats TransactionExecutionResultStats) // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation ExecutionChunkDataPackGenerated(proofSize, numberOfTransactions int) diff --git a/module/metrics/example/execution/main.go b/module/metrics/example/execution/main.go index 5527419b80a..94837711d2d 100644 --- a/module/metrics/example/execution/main.go +++ b/module/metrics/example/execution/main.go @@ -41,13 +41,17 @@ func main() { collector.ExecutionBlockExecuted( duration, - module.ExecutionResultStats{ - ComputationUsed: uint64(rand.Int63n(1e6)), - MemoryUsed: uint64(rand.Int63n(1e6)), - EventCounts: 2, - EventSize: 100, - NumberOfCollections: 1, - NumberOfTransactions: 1, + module.BlockExecutionResultStats{ + CollectionExecutionResultStats: module.CollectionExecutionResultStats{ + ExecutionResultStats: module.ExecutionResultStats{ + EventSize: 100, + EventCounts: 2, + MemoryUsed: uint64(rand.Int63n(1e6)), + ComputationUsed: uint64(rand.Int63n(1e6)), + }, + NumberOfTransactions: 1, + }, + NumberOfCollections: 1, }) diskIncrease := rand.Int63n(1024 * 1024) diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 1eba13d8ace..bb6fba5ed45 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -719,7 +719,7 @@ func (ec *ExecutionCollector) FinishBlockReceivedToExecuted(blockID flow.Identif // ExecutionBlockExecuted reports execution meta data after executing a block func (ec *ExecutionCollector) ExecutionBlockExecuted( dur time.Duration, - stats module.ExecutionResultStats, + stats module.BlockExecutionResultStats, ) { ec.totalExecutedBlocksCounter.Inc() ec.blockExecutionTime.Observe(float64(dur.Milliseconds())) @@ -734,7 +734,7 @@ func (ec *ExecutionCollector) ExecutionBlockExecuted( // ExecutionCollectionExecuted reports stats for executing a collection func (ec *ExecutionCollector) ExecutionCollectionExecuted( dur time.Duration, - stats module.ExecutionResultStats, + stats module.CollectionExecutionResultStats, ) { ec.totalExecutedCollectionsCounter.Inc() ec.collectionExecutionTime.Observe(float64(dur.Milliseconds())) @@ -758,23 +758,18 @@ func (ec *ExecutionCollector) ExecutionBlockCachedPrograms(programs int) { // ExecutionTransactionExecuted reports stats for executing a transaction func (ec *ExecutionCollector) ExecutionTransactionExecuted( dur time.Duration, - numConflictRetries int, - compUsed uint64, - memoryUsed uint64, - eventCounts int, - eventSize int, - failed bool, + stats module.TransactionExecutionResultStats, ) { ec.totalExecutedTransactionsCounter.Inc() ec.transactionExecutionTime.Observe(float64(dur.Milliseconds())) - ec.transactionConflictRetries.Observe(float64(numConflictRetries)) - ec.transactionComputationUsed.Observe(float64(compUsed)) + ec.transactionConflictRetries.Observe(float64(stats.NumberOfTxnConflictRetries)) + ec.transactionComputationUsed.Observe(float64(stats.ComputationUsed)) ec.transactionNormalizedTimePerComputation.Observe( - flow.NormalizedExecutionTimePerComputationUnit(dur, compUsed)) - ec.transactionMemoryEstimate.Observe(float64(memoryUsed)) - ec.transactionEmittedEvents.Observe(float64(eventCounts)) - ec.transactionEventSize.Observe(float64(eventSize)) - if failed { + flow.NormalizedExecutionTimePerComputationUnit(dur, stats.ComputationUsed)) + ec.transactionMemoryEstimate.Observe(float64(stats.MemoryUsed)) + ec.transactionEmittedEvents.Observe(float64(stats.EventCounts)) + ec.transactionEventSize.Observe(float64(stats.EventSize)) + if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } } diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 388de4130c1..430132d7391 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -137,39 +137,40 @@ func (nc *NoopCollector) OnVerifiableChunkReceivedAtVerifierEngine() func (nc *NoopCollector) OnResultApprovalDispatchedInNetworkByVerifier() {} func (nc *NoopCollector) SetMaxChunkDataPackAttemptsForNextUnsealedHeightAtRequester(attempts uint64) { } -func (nc *NoopCollector) OnFinalizedBlockArrivedAtAssigner(height uint64) {} -func (nc *NoopCollector) OnChunksAssignmentDoneAtAssigner(chunks int) {} -func (nc *NoopCollector) OnAssignedChunkProcessedAtAssigner() {} -func (nc *NoopCollector) OnAssignedChunkReceivedAtFetcher() {} -func (nc *NoopCollector) OnChunkDataPackRequestDispatchedInNetworkByRequester() {} -func (nc *NoopCollector) OnChunkDataPackRequestSentByFetcher() {} -func (nc *NoopCollector) OnChunkDataPackRequestReceivedByRequester() {} -func (nc *NoopCollector) OnChunkDataPackArrivedAtFetcher() {} -func (nc *NoopCollector) OnChunkDataPackSentToFetcher() {} -func (nc *NoopCollector) OnVerifiableChunkSentToVerifier() {} -func (nc *NoopCollector) OnBlockConsumerJobDone(uint64) {} -func (nc *NoopCollector) OnChunkConsumerJobDone(uint64) {} -func (nc *NoopCollector) OnChunkDataPackResponseReceivedFromNetworkByRequester() {} -func (nc *NoopCollector) TotalConnectionsInPool(connectionCount uint, connectionPoolSize uint) {} -func (nc *NoopCollector) ConnectionFromPoolReused() {} -func (nc *NoopCollector) ConnectionAddedToPool() {} -func (nc *NoopCollector) NewConnectionEstablished() {} -func (nc *NoopCollector) ConnectionFromPoolInvalidated() {} -func (nc *NoopCollector) ConnectionFromPoolUpdated() {} -func (nc *NoopCollector) ConnectionFromPoolEvicted() {} -func (nc *NoopCollector) StartBlockReceivedToExecuted(blockID flow.Identifier) {} -func (nc *NoopCollector) FinishBlockReceivedToExecuted(blockID flow.Identifier) {} -func (nc *NoopCollector) ExecutionComputationUsedPerBlock(computation uint64) {} -func (nc *NoopCollector) ExecutionStorageStateCommitment(bytes int64) {} -func (nc *NoopCollector) ExecutionCheckpointSize(bytes uint64) {} -func (nc *NoopCollector) ExecutionLastExecutedBlockHeight(height uint64) {} -func (nc *NoopCollector) ExecutionLastFinalizedExecutedBlockHeight(height uint64) {} -func (nc *NoopCollector) ExecutionBlockExecuted(_ time.Duration, _ module.ExecutionResultStats) {} -func (nc *NoopCollector) ExecutionCollectionExecuted(_ time.Duration, _ module.ExecutionResultStats) { +func (nc *NoopCollector) OnFinalizedBlockArrivedAtAssigner(height uint64) {} +func (nc *NoopCollector) OnChunksAssignmentDoneAtAssigner(chunks int) {} +func (nc *NoopCollector) OnAssignedChunkProcessedAtAssigner() {} +func (nc *NoopCollector) OnAssignedChunkReceivedAtFetcher() {} +func (nc *NoopCollector) OnChunkDataPackRequestDispatchedInNetworkByRequester() {} +func (nc *NoopCollector) OnChunkDataPackRequestSentByFetcher() {} +func (nc *NoopCollector) OnChunkDataPackRequestReceivedByRequester() {} +func (nc *NoopCollector) OnChunkDataPackArrivedAtFetcher() {} +func (nc *NoopCollector) OnChunkDataPackSentToFetcher() {} +func (nc *NoopCollector) OnVerifiableChunkSentToVerifier() {} +func (nc *NoopCollector) OnBlockConsumerJobDone(uint64) {} +func (nc *NoopCollector) OnChunkConsumerJobDone(uint64) {} +func (nc *NoopCollector) OnChunkDataPackResponseReceivedFromNetworkByRequester() {} +func (nc *NoopCollector) TotalConnectionsInPool(connectionCount uint, connectionPoolSize uint) {} +func (nc *NoopCollector) ConnectionFromPoolReused() {} +func (nc *NoopCollector) ConnectionAddedToPool() {} +func (nc *NoopCollector) NewConnectionEstablished() {} +func (nc *NoopCollector) ConnectionFromPoolInvalidated() {} +func (nc *NoopCollector) ConnectionFromPoolUpdated() {} +func (nc *NoopCollector) ConnectionFromPoolEvicted() {} +func (nc *NoopCollector) StartBlockReceivedToExecuted(blockID flow.Identifier) {} +func (nc *NoopCollector) FinishBlockReceivedToExecuted(blockID flow.Identifier) {} +func (nc *NoopCollector) ExecutionComputationUsedPerBlock(computation uint64) {} +func (nc *NoopCollector) ExecutionStorageStateCommitment(bytes int64) {} +func (nc *NoopCollector) ExecutionCheckpointSize(bytes uint64) {} +func (nc *NoopCollector) ExecutionLastExecutedBlockHeight(height uint64) {} +func (nc *NoopCollector) ExecutionLastFinalizedExecutedBlockHeight(height uint64) {} +func (nc *NoopCollector) ExecutionBlockExecuted(_ time.Duration, _ module.BlockExecutionResultStats) { +} +func (nc *NoopCollector) ExecutionCollectionExecuted(_ time.Duration, _ module.CollectionExecutionResultStats) { } func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string, _ uint) {} func (nc *NoopCollector) ExecutionBlockCachedPrograms(programs int) {} -func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, _ int, _, _ uint64, _, _ int, _ bool) { +func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, stats module.TransactionExecutionResultStats) { } func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {} func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {} diff --git a/module/mock/execution_metrics.go b/module/mock/execution_metrics.go index 1ccb6e50364..a543d9c13e3 100644 --- a/module/mock/execution_metrics.go +++ b/module/mock/execution_metrics.go @@ -37,7 +37,7 @@ func (_m *ExecutionMetrics) ExecutionBlockDataUploadStarted() { } // ExecutionBlockExecuted provides a mock function with given fields: dur, stats -func (_m *ExecutionMetrics) ExecutionBlockExecuted(dur time.Duration, stats module.ExecutionResultStats) { +func (_m *ExecutionMetrics) ExecutionBlockExecuted(dur time.Duration, stats module.BlockExecutionResultStats) { _m.Called(dur, stats) } @@ -57,7 +57,7 @@ func (_m *ExecutionMetrics) ExecutionChunkDataPackGenerated(proofSize int, numbe } // ExecutionCollectionExecuted provides a mock function with given fields: dur, stats -func (_m *ExecutionMetrics) ExecutionCollectionExecuted(dur time.Duration, stats module.ExecutionResultStats) { +func (_m *ExecutionMetrics) ExecutionCollectionExecuted(dur time.Duration, stats module.CollectionExecutionResultStats) { _m.Called(dur, stats) } @@ -106,9 +106,9 @@ func (_m *ExecutionMetrics) ExecutionSync(syncing bool) { _m.Called(syncing) } -// ExecutionTransactionExecuted provides a mock function with given fields: dur, numTxnConflictRetries, compUsed, memoryUsed, eventCounts, eventSize, failed -func (_m *ExecutionMetrics) ExecutionTransactionExecuted(dur time.Duration, numTxnConflictRetries int, compUsed uint64, memoryUsed uint64, eventCounts int, eventSize int, failed bool) { - _m.Called(dur, numTxnConflictRetries, compUsed, memoryUsed, eventCounts, eventSize, failed) +// ExecutionTransactionExecuted provides a mock function with given fields: dur, stats +func (_m *ExecutionMetrics) ExecutionTransactionExecuted(dur time.Duration, stats module.TransactionExecutionResultStats) { + _m.Called(dur, stats) } // FinishBlockReceivedToExecuted provides a mock function with given fields: blockID From d3af3c6cbde79df90abae09a66197bccb133b050 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 10 Jun 2024 15:24:52 +0300 Subject: [PATCH 02/19] Take into account gas refunds in EVM.dryRun To correctly compute the gas estimation with EVM.dryRun, we need to add any gas refunds to the used gas. Any potential gas refunds are required during transaction execution, and only after they are refunded to the transaction author. --- fvm/evm/emulator/emulator.go | 13 ++- fvm/evm/evm_test.go | 202 +++++++++++++++++++++++++++++++++++ fvm/evm/types/result.go | 4 + 3 files changed, 218 insertions(+), 1 deletion(-) diff --git a/fvm/evm/emulator/emulator.go b/fvm/evm/emulator/emulator.go index ee06deb3d01..8d1c7a2cdd9 100644 --- a/fvm/evm/emulator/emulator.go +++ b/fvm/evm/emulator/emulator.go @@ -234,7 +234,17 @@ func (bl *BlockView) DryRunTransaction( msg.SkipAccountChecks = true // return without commiting the state - return proc.run(msg, tx.Hash(), 0, tx.Type()) + txResult, err := proc.run(msg, tx.Hash(), 0, tx.Type()) + if txResult.VMError == nil && txResult.ValidationError == nil { + // Adding `gethParams.SstoreSentryGasEIP2200` is needed for this condition: + // https://github.com/onflow/go-ethereum/blob/master/core/vm/operations_acl.go#L29-L32 + txResult.GasConsumed += gethParams.SstoreSentryGasEIP2200 + // Take into account any gas refunds, which are calculated only after + // transaction execution. + txResult.GasConsumed += txResult.GasRefund + } + + return txResult, err } func (bl *BlockView) newProcedure() (*procedure, error) { @@ -522,6 +532,7 @@ func (proc *procedure) run( // if prechecks are passed, the exec result won't be nil if execResult != nil { res.GasConsumed = execResult.UsedGas + res.GasRefund = proc.state.GetRefund() res.Index = uint16(txIndex) // we need to capture the returned value no matter the status // if the tx is reverted the error message is returned as returned value diff --git a/fvm/evm/evm_test.go b/fvm/evm/evm_test.go index 12cd3994b24..fb332792cb5 100644 --- a/fvm/evm/evm_test.go +++ b/fvm/evm/evm_test.go @@ -12,6 +12,7 @@ import ( "github.com/onflow/cadence/encoding/ccf" gethTypes "github.com/onflow/go-ethereum/core/types" + "github.com/onflow/go-ethereum/params" "github.com/onflow/go-ethereum/rlp" "github.com/stretchr/testify/assert" @@ -1462,6 +1463,207 @@ func TestDryRun(t *testing.T) { }) }) + t.Run("test dry run storing current value", func(t *testing.T) { + RunWithNewEnvironment(t, + chain, func( + ctx fvm.Context, + vm fvm.VM, + snapshot snapshot.SnapshotTree, + testContract *TestContract, + testAccount *EOATestAccount, + ) { + data := testContract.MakeCallData(t, "store", big.NewInt(0)) + + limit := uint64(math.MaxUint64 - 1) + tx := gethTypes.NewTransaction( + 0, + testContract.DeployedAt.ToCommon(), + big.NewInt(0), + limit, + big.NewInt(0), + data, + ) + result := dryRunTx(t, tx, ctx, vm, snapshot, testContract) + require.Equal(t, types.ErrCodeNoError, result.ErrorCode) + require.Equal(t, types.StatusSuccessful, result.Status) + require.Greater(t, result.GasConsumed, uint64(0)) + require.Equal( + t, + uint64(23873)+params.SstoreSentryGasEIP2200, + result.GasConsumed, + ) + require.Equal(t, uint64(0), result.GasRefund) + }) + }) + + t.Run("test dry run storing new value", func(t *testing.T) { + RunWithNewEnvironment(t, + chain, func( + ctx fvm.Context, + vm fvm.VM, + snapshot snapshot.SnapshotTree, + testContract *TestContract, + testAccount *EOATestAccount, + ) { + sc := systemcontracts.SystemContractsForChain(chain.ChainID()) + code := []byte(fmt.Sprintf( + ` + import EVM from %s + + transaction(tx: [UInt8], coinbaseBytes: [UInt8; 20]){ + prepare(account: &Account) { + let coinbase = EVM.EVMAddress(bytes: coinbaseBytes) + let res = EVM.run(tx: tx, coinbase: coinbase) + + assert(res.status == EVM.Status.successful, message: "unexpected status") + assert(res.errorCode == 0, message: "unexpected error code") + } + } + `, + sc.EVMContract.Address.HexWithPrefix(), + )) + + num := int64(12) + innerTxBytes := testAccount.PrepareSignAndEncodeTx(t, + testContract.DeployedAt.ToCommon(), + testContract.MakeCallData(t, "store", big.NewInt(num)), + big.NewInt(0), + uint64(50_000), + big.NewInt(0), + ) + + innerTx := cadence.NewArray( + ConvertToCadence(innerTxBytes), + ).WithType(stdlib.EVMTransactionBytesCadenceType) + + coinbase := cadence.NewArray( + ConvertToCadence(testAccount.Address().Bytes()), + ).WithType(stdlib.EVMAddressBytesCadenceType) + + tx := fvm.Transaction( + flow.NewTransactionBody(). + SetScript(code). + AddAuthorizer(sc.FlowServiceAccount.Address). + AddArgument(json.MustEncode(innerTx)). + AddArgument(json.MustEncode(coinbase)), + 0) + + _, output, err := vm.Run( + ctx, + tx, + snapshot, + ) + require.NoError(t, err) + require.NoError(t, output.Err) + + data := testContract.MakeCallData(t, "store", big.NewInt(100)) + + tx1 := gethTypes.NewTransaction( + 0, + testContract.DeployedAt.ToCommon(), + big.NewInt(0), + uint64(50_000), + big.NewInt(0), + data, + ) + result := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) + require.Equal(t, types.ErrCodeNoError, result.ErrorCode) + require.Equal(t, types.StatusSuccessful, result.Status) + require.Greater(t, result.GasConsumed, uint64(0)) + require.Equal( + t, + uint64(43785)+params.SstoreSentryGasEIP2200, + result.GasConsumed, + ) + require.Equal(t, uint64(0), result.GasRefund) + }) + }) + + t.Run("test dry run clear current value", func(t *testing.T) { + RunWithNewEnvironment(t, + chain, func( + ctx fvm.Context, + vm fvm.VM, + snapshot snapshot.SnapshotTree, + testContract *TestContract, + testAccount *EOATestAccount, + ) { + sc := systemcontracts.SystemContractsForChain(chain.ChainID()) + code := []byte(fmt.Sprintf( + ` + import EVM from %s + + transaction(tx: [UInt8], coinbaseBytes: [UInt8; 20]){ + prepare(account: &Account) { + let coinbase = EVM.EVMAddress(bytes: coinbaseBytes) + let res = EVM.run(tx: tx, coinbase: coinbase) + + assert(res.status == EVM.Status.successful, message: "unexpected status") + assert(res.errorCode == 0, message: "unexpected error code") + } + } + `, + sc.EVMContract.Address.HexWithPrefix(), + )) + + num := int64(100) + innerTxBytes := testAccount.PrepareSignAndEncodeTx(t, + testContract.DeployedAt.ToCommon(), + testContract.MakeCallData(t, "store", big.NewInt(num)), + big.NewInt(0), + uint64(50_000), + big.NewInt(0), + ) + + innerTx := cadence.NewArray( + ConvertToCadence(innerTxBytes), + ).WithType(stdlib.EVMTransactionBytesCadenceType) + + coinbase := cadence.NewArray( + ConvertToCadence(testAccount.Address().Bytes()), + ).WithType(stdlib.EVMAddressBytesCadenceType) + + tx := fvm.Transaction( + flow.NewTransactionBody(). + SetScript(code). + AddAuthorizer(sc.FlowServiceAccount.Address). + AddArgument(json.MustEncode(innerTx)). + AddArgument(json.MustEncode(coinbase)), + 0) + + state, output, err := vm.Run( + ctx, + tx, + snapshot, + ) + require.NoError(t, err) + require.NoError(t, output.Err) + snapshot = snapshot.Append(state) + + data := testContract.MakeCallData(t, "store", big.NewInt(0)) + + tx1 := gethTypes.NewTransaction( + 0, + testContract.DeployedAt.ToCommon(), + big.NewInt(0), + uint64(50_000), + big.NewInt(0), + data, + ) + result := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) + require.Equal(t, types.ErrCodeNoError, result.ErrorCode) + require.Equal(t, types.StatusSuccessful, result.Status) + require.Greater(t, result.GasConsumed, uint64(0)) + require.Equal( + t, + uint64(21873)+params.SstoreSentryGasEIP2200+params.SstoreClearsScheduleRefundEIP3529, + result.GasConsumed, + ) + // We might want to think about exposing GasRefuned to EVM contract. + // require.Equal(t, uint64(4800), result.GasRefund) + }) + }) + // this test makes sure the dry-run that updates the value on the contract // doesn't persist the change, and after when the value is read it isn't updated. t.Run("test dry run for any side-effects", func(t *testing.T) { diff --git a/fvm/evm/types/result.go b/fvm/evm/types/result.go index ac22760464c..3c77a821ad8 100644 --- a/fvm/evm/types/result.go +++ b/fvm/evm/types/result.go @@ -39,6 +39,7 @@ type ResultSummary struct { ErrorCode ErrorCode ErrorMessage string GasConsumed uint64 + GasRefund uint64 DeployedContractAddress *Address ReturnedData Data } @@ -71,6 +72,8 @@ type Result struct { TxType uint8 // total gas consumed during an opeartion GasConsumed uint64 + // total gas refunds after transaction execution + GasRefund uint64 // the address where the contract is deployed (if any) DeployedContractAddress *Address // returned data from a function call @@ -142,6 +145,7 @@ func (res *Result) Receipt() *gethTypes.Receipt { func (res *Result) ResultSummary() *ResultSummary { rs := &ResultSummary{ GasConsumed: res.GasConsumed, + GasRefund: res.GasRefund, DeployedContractAddress: res.DeployedContractAddress, ReturnedData: res.ReturnedData, Status: StatusSuccessful, From b6fc9e1ea349062b7fbbb9d575421f9d213518e2 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Tue, 11 Jun 2024 13:00:52 +0300 Subject: [PATCH 03/19] Add the Successful() method on Result and improve tests for EVM.dryRun --- fvm/evm/emulator/emulator.go | 2 +- fvm/evm/evm_test.go | 202 +++++++++++++++++++++++++++++------ fvm/evm/types/result.go | 5 + 3 files changed, 178 insertions(+), 31 deletions(-) diff --git a/fvm/evm/emulator/emulator.go b/fvm/evm/emulator/emulator.go index 8d1c7a2cdd9..b86821e9f7f 100644 --- a/fvm/evm/emulator/emulator.go +++ b/fvm/evm/emulator/emulator.go @@ -235,7 +235,7 @@ func (bl *BlockView) DryRunTransaction( // return without commiting the state txResult, err := proc.run(msg, tx.Hash(), 0, tx.Type()) - if txResult.VMError == nil && txResult.ValidationError == nil { + if txResult.Successful() { // Adding `gethParams.SstoreSentryGasEIP2200` is needed for this condition: // https://github.com/onflow/go-ethereum/blob/master/core/vm/operations_acl.go#L29-L32 txResult.GasConsumed += gethParams.SstoreSentryGasEIP2200 diff --git a/fvm/evm/evm_test.go b/fvm/evm/evm_test.go index fb332792cb5..3346238a30a 100644 --- a/fvm/evm/evm_test.go +++ b/fvm/evm/evm_test.go @@ -12,7 +12,7 @@ import ( "github.com/onflow/cadence/encoding/ccf" gethTypes "github.com/onflow/go-ethereum/core/types" - "github.com/onflow/go-ethereum/params" + gethParams "github.com/onflow/go-ethereum/params" "github.com/onflow/go-ethereum/rlp" "github.com/stretchr/testify/assert" @@ -1463,7 +1463,7 @@ func TestDryRun(t *testing.T) { }) }) - t.Run("test dry run storing current value", func(t *testing.T) { + t.Run("test dry run store current value", func(t *testing.T) { RunWithNewEnvironment(t, chain, func( ctx fvm.Context, @@ -1473,30 +1473,76 @@ func TestDryRun(t *testing.T) { testAccount *EOATestAccount, ) { data := testContract.MakeCallData(t, "store", big.NewInt(0)) - - limit := uint64(math.MaxUint64 - 1) tx := gethTypes.NewTransaction( 0, testContract.DeployedAt.ToCommon(), big.NewInt(0), - limit, + uint64(50_000), big.NewInt(0), data, ) - result := dryRunTx(t, tx, ctx, vm, snapshot, testContract) - require.Equal(t, types.ErrCodeNoError, result.ErrorCode) - require.Equal(t, types.StatusSuccessful, result.Status) - require.Greater(t, result.GasConsumed, uint64(0)) + dryRunResult := dryRunTx(t, tx, ctx, vm, snapshot, testContract) + + require.Equal(t, types.ErrCodeNoError, dryRunResult.ErrorCode) + require.Equal(t, types.StatusSuccessful, dryRunResult.Status) + require.Greater(t, dryRunResult.GasConsumed, uint64(0)) + + code := []byte(fmt.Sprintf( + ` + import EVM from %s + access(all) + fun main(tx: [UInt8], coinbaseBytes: [UInt8; 20]): EVM.Result { + let coinbase = EVM.EVMAddress(bytes: coinbaseBytes) + return EVM.run(tx: tx, coinbase: coinbase) + } + `, + evmAddress, + )) + + innerTxBytes := testAccount.PrepareSignAndEncodeTx(t, + testContract.DeployedAt.ToCommon(), + data, + big.NewInt(0), + dryRunResult.GasConsumed, // use the gas estimation from Evm.dryRun + big.NewInt(0), + ) + + innerTx := cadence.NewArray( + ConvertToCadence(innerTxBytes), + ).WithType(stdlib.EVMTransactionBytesCadenceType) + + coinbase := cadence.NewArray( + ConvertToCadence(testAccount.Address().Bytes()), + ).WithType(stdlib.EVMAddressBytesCadenceType) + + script := fvm.Script(code).WithArguments( + json.MustEncode(innerTx), + json.MustEncode(coinbase), + ) + + _, output, err := vm.Run( + ctx, + script, + snapshot) + require.NoError(t, err) + require.NoError(t, output.Err) + + res, err := stdlib.ResultSummaryFromEVMResultValue(output.Value) + require.NoError(t, err) + require.Equal(t, types.StatusSuccessful, res.Status) + require.Equal(t, types.ErrCodeNoError, res.ErrorCode) + // Make sure that gas consumed from `EVM.dryRun` is bigger + // than the actual gas consumption of the equivalent + // `EVM.run`. require.Equal( t, - uint64(23873)+params.SstoreSentryGasEIP2200, - result.GasConsumed, + res.GasConsumed+gethParams.SstoreSentryGasEIP2200, + dryRunResult.GasConsumed, ) - require.Equal(t, uint64(0), result.GasRefund) }) }) - t.Run("test dry run storing new value", func(t *testing.T) { + t.Run("test dry run store new value", func(t *testing.T) { RunWithNewEnvironment(t, chain, func( ctx fvm.Context, @@ -1557,7 +1603,6 @@ func TestDryRun(t *testing.T) { require.NoError(t, output.Err) data := testContract.MakeCallData(t, "store", big.NewInt(100)) - tx1 := gethTypes.NewTransaction( 0, testContract.DeployedAt.ToCommon(), @@ -1566,16 +1611,67 @@ func TestDryRun(t *testing.T) { big.NewInt(0), data, ) - result := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) - require.Equal(t, types.ErrCodeNoError, result.ErrorCode) - require.Equal(t, types.StatusSuccessful, result.Status) - require.Greater(t, result.GasConsumed, uint64(0)) + dryRunResult := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) + + require.Equal(t, types.ErrCodeNoError, dryRunResult.ErrorCode) + require.Equal(t, types.StatusSuccessful, dryRunResult.Status) + require.Greater(t, dryRunResult.GasConsumed, uint64(0)) + + code = []byte(fmt.Sprintf( + ` + import EVM from %s + access(all) + fun main(tx: [UInt8], coinbaseBytes: [UInt8; 20]): EVM.Result { + let coinbase = EVM.EVMAddress(bytes: coinbaseBytes) + return EVM.run(tx: tx, coinbase: coinbase) + } + `, + evmAddress, + )) + + // Decrease nonce because we are Cadence using scripts, and not + // transactions, which means that no state change is happening. + testAccount.SetNonce(testAccount.Nonce() - 1) + innerTxBytes = testAccount.PrepareSignAndEncodeTx(t, + testContract.DeployedAt.ToCommon(), + data, + big.NewInt(0), + dryRunResult.GasConsumed, // use the gas estimation from Evm.dryRun + big.NewInt(0), + ) + + innerTx = cadence.NewArray( + ConvertToCadence(innerTxBytes), + ).WithType(stdlib.EVMTransactionBytesCadenceType) + + coinbase = cadence.NewArray( + ConvertToCadence(testAccount.Address().Bytes()), + ).WithType(stdlib.EVMAddressBytesCadenceType) + + script := fvm.Script(code).WithArguments( + json.MustEncode(innerTx), + json.MustEncode(coinbase), + ) + + _, output, err = vm.Run( + ctx, + script, + snapshot) + require.NoError(t, err) + require.NoError(t, output.Err) + + res, err := stdlib.ResultSummaryFromEVMResultValue(output.Value) + require.NoError(t, err) + require.Equal(t, types.StatusSuccessful, res.Status) + require.Equal(t, types.ErrCodeNoError, res.ErrorCode) + // Make sure that gas consumed from `EVM.dryRun` is bigger + // than the actual gas consumption of the equivalent + // `EVM.run`. require.Equal( t, - uint64(43785)+params.SstoreSentryGasEIP2200, - result.GasConsumed, + res.GasConsumed+gethParams.SstoreSentryGasEIP2200, + dryRunResult.GasConsumed, ) - require.Equal(t, uint64(0), result.GasRefund) }) }) @@ -1641,7 +1737,6 @@ func TestDryRun(t *testing.T) { snapshot = snapshot.Append(state) data := testContract.MakeCallData(t, "store", big.NewInt(0)) - tx1 := gethTypes.NewTransaction( 0, testContract.DeployedAt.ToCommon(), @@ -1650,17 +1745,64 @@ func TestDryRun(t *testing.T) { big.NewInt(0), data, ) - result := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) - require.Equal(t, types.ErrCodeNoError, result.ErrorCode) - require.Equal(t, types.StatusSuccessful, result.Status) - require.Greater(t, result.GasConsumed, uint64(0)) + dryRunResult := dryRunTx(t, tx1, ctx, vm, snapshot, testContract) + + require.Equal(t, types.ErrCodeNoError, dryRunResult.ErrorCode) + require.Equal(t, types.StatusSuccessful, dryRunResult.Status) + require.Greater(t, dryRunResult.GasConsumed, uint64(0)) + + code = []byte(fmt.Sprintf( + ` + import EVM from %s + access(all) + fun main(tx: [UInt8], coinbaseBytes: [UInt8; 20]): EVM.Result { + let coinbase = EVM.EVMAddress(bytes: coinbaseBytes) + return EVM.run(tx: tx, coinbase: coinbase) + } + `, + evmAddress, + )) + + innerTxBytes = testAccount.PrepareSignAndEncodeTx(t, + testContract.DeployedAt.ToCommon(), + data, + big.NewInt(0), + dryRunResult.GasConsumed, // use the gas estimation from Evm.dryRun + big.NewInt(0), + ) + + innerTx = cadence.NewArray( + ConvertToCadence(innerTxBytes), + ).WithType(stdlib.EVMTransactionBytesCadenceType) + + coinbase = cadence.NewArray( + ConvertToCadence(testAccount.Address().Bytes()), + ).WithType(stdlib.EVMAddressBytesCadenceType) + + script := fvm.Script(code).WithArguments( + json.MustEncode(innerTx), + json.MustEncode(coinbase), + ) + + _, output, err = vm.Run( + ctx, + script, + snapshot) + require.NoError(t, err) + require.NoError(t, output.Err) + + res, err := stdlib.ResultSummaryFromEVMResultValue(output.Value) + require.NoError(t, err) + //require.Equal(t, types.StatusSuccessful, res.Status) + require.Equal(t, types.ErrCodeNoError, res.ErrorCode) + // Make sure that gas consumed from `EVM.dryRun` is bigger + // than the actual gas consumption of the equivalent + // `EVM.run`. require.Equal( t, - uint64(21873)+params.SstoreSentryGasEIP2200+params.SstoreClearsScheduleRefundEIP3529, - result.GasConsumed, + res.GasConsumed+gethParams.SstoreSentryGasEIP2200+gethParams.SstoreClearsScheduleRefundEIP3529, + dryRunResult.GasConsumed, ) - // We might want to think about exposing GasRefuned to EVM contract. - // require.Equal(t, uint64(4800), result.GasRefund) }) }) diff --git a/fvm/evm/types/result.go b/fvm/evm/types/result.go index 3c77a821ad8..aba686c1a9f 100644 --- a/fvm/evm/types/result.go +++ b/fvm/evm/types/result.go @@ -96,6 +96,11 @@ func (res *Result) Failed() bool { return res.VMError != nil } +// Successful returns true if transaction has been executed without any errors +func (res *Result) Successful() bool { + return !res.Failed() && !res.Invalid() +} + // SetValidationError sets the validation error // and also sets the gas used to the fixed invalid gas usage func (res *Result) SetValidationError(err error) { From a96af388b551e3b7211d7d0d3d79bd839798d6fb Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Jun 2024 10:56:48 -0700 Subject: [PATCH 04/19] update chunk data pack storage interface --- storage/chunkDataPacks.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/storage/chunkDataPacks.go b/storage/chunkDataPacks.go index d9f49735d90..a17acaf0e86 100644 --- a/storage/chunkDataPacks.go +++ b/storage/chunkDataPacks.go @@ -15,9 +15,6 @@ type ChunkDataPacks interface { // No errors are expected during normal operation, but it may return generic error Remove(cs []flow.Identifier) error - // BatchStore inserts the chunk header, keyed by chunk ID into a given batch - BatchStore(c *flow.ChunkDataPack, batch BatchStorage) error - // ByChunkID returns the chunk data for the given a chunk ID. ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) From d907833726e22b27bbf234231163362f427b8eda Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Jun 2024 11:59:46 -0700 Subject: [PATCH 05/19] move StoredChunkDataPack to storage package --- storage/badger/chunkDataPacks.go | 23 +++++++------- storage/badger/model/storedChunkDataPack.go | 17 ---------- storage/badger/operation/chunkDataPacks.go | 8 ++--- .../badger/operation/chunkDataPacks_test.go | 10 +++--- storage/chunkDataPacks.go | 31 +++++++++++++++++++ 5 files changed, 51 insertions(+), 38 deletions(-) delete mode 100644 storage/badger/model/storedChunkDataPack.go diff --git a/storage/badger/chunkDataPacks.go b/storage/badger/chunkDataPacks.go index 63865a574ce..7d08aef934f 100644 --- a/storage/badger/chunkDataPacks.go +++ b/storage/badger/chunkDataPacks.go @@ -9,7 +9,6 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - badgermodel "github.com/onflow/flow-go/storage/badger/model" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/transaction" ) @@ -17,25 +16,25 @@ import ( type ChunkDataPacks struct { db *badger.DB collections storage.Collections - byChunkIDCache *Cache[flow.Identifier, *badgermodel.StoredChunkDataPack] + byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] } func NewChunkDataPacks(collector module.CacheMetrics, db *badger.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks { - store := func(key flow.Identifier, val *badgermodel.StoredChunkDataPack) func(*transaction.Tx) error { + store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error { return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val))) } - retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - var c badgermodel.StoredChunkDataPack + retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + var c storage.StoredChunkDataPack err := operation.RetrieveChunkDataPack(key, &c)(tx) return &c, err } } cache := newCache(collector, metrics.ResourceChunkDataPack, - withLimit[flow.Identifier, *badgermodel.StoredChunkDataPack](byChunkIDCacheSize), + withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), withStore(store), withRetrieve(retrieve), ) @@ -133,7 +132,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac return chdp, nil } -func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.StoredChunkDataPack, error) { +func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) { tx := ch.db.NewTransaction(false) defer tx.Discard() @@ -145,8 +144,8 @@ func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.Store return schdp, nil } -func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { +func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*storage.StoredChunkDataPack, error) { + return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { val, err := ch.byChunkIDCache.Get(chunkID)(tx) if err != nil { return nil, err @@ -155,8 +154,8 @@ func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn } } -func toStoredChunkDataPack(c *flow.ChunkDataPack) *badgermodel.StoredChunkDataPack { - sc := &badgermodel.StoredChunkDataPack{ +func toStoredChunkDataPack(c *flow.ChunkDataPack) *storage.StoredChunkDataPack { + sc := &storage.StoredChunkDataPack{ ChunkID: c.ChunkID, StartState: c.StartState, Proof: c.Proof, diff --git a/storage/badger/model/storedChunkDataPack.go b/storage/badger/model/storedChunkDataPack.go deleted file mode 100644 index 31349604070..00000000000 --- a/storage/badger/model/storedChunkDataPack.go +++ /dev/null @@ -1,17 +0,0 @@ -package badgermodel - -import ( - "github.com/onflow/flow-go/model/flow" -) - -// StoredChunkDataPack is an in-storage representation of chunk data pack. -// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining -// the collection on a secondary storage. -type StoredChunkDataPack struct { - ChunkID flow.Identifier - StartState flow.StateCommitment - Proof flow.StorageProof - CollectionID flow.Identifier - SystemChunk bool - ExecutionDataRoot flow.BlockExecutionDataRoot -} diff --git a/storage/badger/operation/chunkDataPacks.go b/storage/badger/operation/chunkDataPacks.go index 687712985d4..e0f2deb2ce2 100644 --- a/storage/badger/operation/chunkDataPacks.go +++ b/storage/badger/operation/chunkDataPacks.go @@ -4,16 +4,16 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/onflow/flow-go/model/flow" - badgermodel "github.com/onflow/flow-go/storage/badger/model" + "github.com/onflow/flow-go/storage" ) // InsertChunkDataPack inserts a chunk data pack keyed by chunk ID. -func InsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error { +func InsertChunkDataPack(c *storage.StoredChunkDataPack) func(*badger.Txn) error { return insert(makePrefix(codeChunkDataPack, c.ChunkID), c) } // BatchInsertChunkDataPack inserts a chunk data pack keyed by chunk ID into a batch -func BatchInsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(batch *badger.WriteBatch) error { +func BatchInsertChunkDataPack(c *storage.StoredChunkDataPack) func(batch *badger.WriteBatch) error { return batchWrite(makePrefix(codeChunkDataPack, c.ChunkID), c) } @@ -25,7 +25,7 @@ func BatchRemoveChunkDataPack(chunkID flow.Identifier) func(batch *badger.WriteB } // RetrieveChunkDataPack retrieves a chunk data pack by chunk ID. -func RetrieveChunkDataPack(chunkID flow.Identifier, c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error { +func RetrieveChunkDataPack(chunkID flow.Identifier, c *storage.StoredChunkDataPack) func(*badger.Txn) error { return retrieve(makePrefix(codeChunkDataPack, chunkID), c) } diff --git a/storage/badger/operation/chunkDataPacks_test.go b/storage/badger/operation/chunkDataPacks_test.go index 0dc79ef7266..f3a90af8d00 100644 --- a/storage/badger/operation/chunkDataPacks_test.go +++ b/storage/badger/operation/chunkDataPacks_test.go @@ -7,14 +7,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - storagemodel "github.com/onflow/flow-go/storage/badger/model" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/utils/unittest" ) func TestChunkDataPack(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { collectionID := unittest.IdentifierFixture() - expected := &storagemodel.StoredChunkDataPack{ + expected := &storage.StoredChunkDataPack{ ChunkID: unittest.IdentifierFixture(), StartState: unittest.StateCommitmentFixture(), Proof: []byte{'p'}, @@ -22,7 +22,7 @@ func TestChunkDataPack(t *testing.T) { } t.Run("Retrieve non-existent", func(t *testing.T) { - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err := db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.Error(t, err) }) @@ -31,7 +31,7 @@ func TestChunkDataPack(t *testing.T) { err := db.Update(InsertChunkDataPack(expected)) require.NoError(t, err) - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.NoError(t, err) @@ -42,7 +42,7 @@ func TestChunkDataPack(t *testing.T) { err := db.Update(RemoveChunkDataPack(expected.ChunkID)) require.NoError(t, err) - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.Error(t, err) }) diff --git a/storage/chunkDataPacks.go b/storage/chunkDataPacks.go index a17acaf0e86..60d4fe2375d 100644 --- a/storage/chunkDataPacks.go +++ b/storage/chunkDataPacks.go @@ -23,3 +23,34 @@ type ChunkDataPacks interface { // If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. BatchRemove(chunkID flow.Identifier, batch BatchStorage) error } + +// StoredChunkDataPack is an in-storage representation of chunk data pack. +// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining +// the collection on a secondary storage. +type StoredChunkDataPack struct { + ChunkID flow.Identifier + StartState flow.StateCommitment + Proof flow.StorageProof + CollectionID flow.Identifier + SystemChunk bool + ExecutionDataRoot flow.BlockExecutionDataRoot +} + +func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack { + sc := &StoredChunkDataPack{ + ChunkID: c.ChunkID, + StartState: c.StartState, + Proof: c.Proof, + SystemChunk: false, + ExecutionDataRoot: c.ExecutionDataRoot, + } + + if c.Collection != nil { + // non system chunk + sc.CollectionID = c.Collection.ID() + } else { + sc.SystemChunk = true + } + + return sc +} From 14dcad5e8f45044d75cfc410462ccaf98846dd07 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Jun 2024 15:55:16 -0700 Subject: [PATCH 06/19] implement pebble chunk data pack --- model/flow/chunk.go | 51 ++++++++++ storage/badger/chunkDataPacks.go | 21 +--- storage/pebble/chunk_data_packs.go | 128 ++++++++++++++++++++++++ storage/pebble/chunk_data_packs_test.go | 90 +++++++++++++++++ 4 files changed, 270 insertions(+), 20 deletions(-) create mode 100644 storage/pebble/chunk_data_packs.go create mode 100644 storage/pebble/chunk_data_packs_test.go diff --git a/model/flow/chunk.go b/model/flow/chunk.go index e8aeafea8bf..90fd760c0f4 100644 --- a/model/flow/chunk.go +++ b/model/flow/chunk.go @@ -4,6 +4,7 @@ import ( "log" "github.com/ipfs/go-cid" + "github.com/vmihailenco/msgpack/v4" ) var EmptyEventCollectionID Identifier @@ -203,3 +204,53 @@ type BlockExecutionDataRoot struct { // associated with this block. ChunkExecutionDataIDs []cid.Cid } + +// MarshalMsgpack implements the msgpack.Marshaler interface +func (b BlockExecutionDataRoot) MarshalMsgpack() ([]byte, error) { + return msgpack.Marshal(struct { + BlockID Identifier + ChunkExecutionDataIDs []string + }{ + BlockID: b.BlockID, + ChunkExecutionDataIDs: cidsToStrings(b.ChunkExecutionDataIDs), + }) +} + +// UnmarshalMsgpack implements the msgpack.Unmarshaler interface +func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error { + var temp struct { + BlockID Identifier + ChunkExecutionDataIDs []string + } + + if err := msgpack.Unmarshal(data, &temp); err != nil { + return err + } + + b.BlockID = temp.BlockID + b.ChunkExecutionDataIDs = stringsToCids(temp.ChunkExecutionDataIDs) + + return nil +} + +// Helper function to convert a slice of cid.Cid to a slice of strings +func cidsToStrings(cids []cid.Cid) []string { + strs := make([]string, len(cids)) + for i, c := range cids { + strs[i] = c.String() + } + return strs +} + +// Helper function to convert a slice of strings to a slice of cid.Cid +func stringsToCids(strs []string) []cid.Cid { + cids := make([]cid.Cid, len(strs)) + for i, s := range strs { + c, err := cid.Decode(s) + if err != nil { + panic(err) // Handle error appropriately in real code + } + cids[i] = c + } + return cids +} diff --git a/storage/badger/chunkDataPacks.go b/storage/badger/chunkDataPacks.go index 7d08aef934f..05f42cf7856 100644 --- a/storage/badger/chunkDataPacks.go +++ b/storage/badger/chunkDataPacks.go @@ -70,7 +70,7 @@ func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error { // No errors are expected during normal operation, but it may return generic error // if entity is not serializable or Badger unexpectedly fails to process request func (ch *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error { - sc := toStoredChunkDataPack(c) + sc := storage.ToStoredChunkDataPack(c) writeBatch := batch.GetWriter() batch.OnSucceed(func() { ch.byChunkIDCache.Insert(sc.ChunkID, sc) @@ -153,22 +153,3 @@ func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn return val, nil } } - -func toStoredChunkDataPack(c *flow.ChunkDataPack) *storage.StoredChunkDataPack { - sc := &storage.StoredChunkDataPack{ - ChunkID: c.ChunkID, - StartState: c.StartState, - Proof: c.Proof, - SystemChunk: false, - ExecutionDataRoot: c.ExecutionDataRoot, - } - - if c.Collection != nil { - // non system chunk - sc.CollectionID = c.Collection.ID() - } else { - sc.SystemChunk = true - } - - return sc -} diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go new file mode 100644 index 00000000000..4573db65e79 --- /dev/null +++ b/storage/pebble/chunk_data_packs.go @@ -0,0 +1,128 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type ChunkDataPacks struct { + db *pebble.DB + collections storage.Collections +} + +var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil) + +func NewChunkDataPacks(db *pebble.DB, collections storage.Collections) *ChunkDataPacks { + return &ChunkDataPacks{ + db: db, + collections: collections, + } +} + +func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { + batch := ch.db.NewBatch() + defer batch.Close() + for _, c := range cs { + err := ch.batchStore(c, batch) + if err != nil { + return fmt.Errorf("cannot store chunk data pack: %w", err) + } + } + + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("cannot commit batch: %w", err) + } + + return nil +} + +func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { + return nil +} + +func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { + var sc storage.StoredChunkDataPack + err := RetrieveChunkDataPack(ch.db, chunkID, &sc) + if err != nil { + return nil, fmt.Errorf("could not retrieve stored chunk data pack: %w", err) + } + + chdp := &flow.ChunkDataPack{ + ChunkID: sc.ChunkID, + StartState: sc.StartState, + Proof: sc.Proof, + Collection: nil, // to be filled in later + ExecutionDataRoot: sc.ExecutionDataRoot, + } + if !sc.SystemChunk { + collection, err := ch.collections.ByID(sc.CollectionID) + if err != nil { + return nil, fmt.Errorf("could not retrive collection (id: %x) for stored chunk data pack: %w", sc.CollectionID, err) + } + + chdp.Collection = collection + } + return chdp, nil +} + +func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.BatchStorage) error { + return nil +} + +func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *pebble.Batch) error { + sc := storage.ToStoredChunkDataPack(c) + return InsertChunkDataPack(batch, sc) +} + +func InsertChunkDataPack(batch *pebble.Batch, sc *storage.StoredChunkDataPack) error { + key := makeKey(codeChunkDataPack, sc.ChunkID) + return batchWrite(batch, key, sc) +} + +func RetrieveChunkDataPack(db *pebble.DB, chunkID flow.Identifier, sc *storage.StoredChunkDataPack) error { + key := makeKey(codeChunkDataPack, chunkID) + return retrieve(db, key, sc) +} + +func batchWrite(batch *pebble.Batch, key []byte, val interface{}) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = batch.Set(key, value, nil) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil +} + +func retrieve(db *pebble.DB, key []byte, sc interface{}) error { + val, closer, err := db.Get(key) + if err != nil { + return convertNotFoundError(err) + } + defer closer.Close() + + err = msgpack.Unmarshal(val, &sc) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + return nil +} + +const ( + codeChunkDataPack = 100 +) + +func makeKey(prefix byte, chunkID flow.Identifier) []byte { + return append([]byte{prefix}, chunkID[:]...) +} diff --git a/storage/pebble/chunk_data_packs_test.go b/storage/pebble/chunk_data_packs_test.go new file mode 100644 index 00000000000..79d6c5bb908 --- /dev/null +++ b/storage/pebble/chunk_data_packs_test.go @@ -0,0 +1,90 @@ +package pebble + +import ( + "path/filepath" + "testing" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + badgerstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestMsgPacks(t *testing.T) { + chunkDataPacks := unittest.ChunkDataPacksFixture(10) + for _, chunkDataPack := range chunkDataPacks { + sc := storage.ToStoredChunkDataPack(chunkDataPack) + value, err := msgpack.Marshal(sc) + require.NoError(t, err) + + var actual storage.StoredChunkDataPack + err = msgpack.Unmarshal(value, &actual) + require.NoError(t, err) + + require.Equal(t, *sc, actual) + } +} + +// TestChunkDataPacks_Store evaluates correct storage and retrieval of chunk data packs in the storage. +// It also evaluates that re-inserting is idempotent. +func TestChunkDataPacks_Store(t *testing.T) { + WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore *ChunkDataPacks, _ *pebble.DB) { + require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + }) +} + +// WithChunkDataPacks is a test helper that generates specified number of chunk data packs, store them using the storeFunc, and +// then evaluates whether they are successfully retrieved from storage. +func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, *ChunkDataPacks, *pebble.DB)) { + RunWithBadgerDBAndPebbleDB(t, func(badgerDB *badger.DB, db *pebble.DB) { + transactions := badgerstorage.NewTransactions(&metrics.NoopCollector{}, badgerDB) + collections := badgerstorage.NewCollections(badgerDB, transactions) + // keep the cache size at 1 to make sure that entries are written and read from storage itself. + store := NewChunkDataPacks(db, collections) + + chunkDataPacks := unittest.ChunkDataPacksFixture(chunks) + for _, chunkDataPack := range chunkDataPacks { + // stores collection in Collections storage (which ChunkDataPacks store uses internally) + err := collections.Store(chunkDataPack.Collection) + require.NoError(t, err) + } + + // stores chunk data packs in the memory using provided store function. + storeFunc(t, chunkDataPacks, store, db) + + // stored chunk data packs should be retrieved successfully. + for _, expected := range chunkDataPacks { + actual, err := store.ByChunkID(expected.ChunkID) + require.NoError(t, err) + + assert.Equal(t, expected, actual) + } + }) +} + +func RunWithBadgerDBAndPebbleDB(t *testing.T, fn func(*badger.DB, *pebble.DB)) { + unittest.RunWithTempDir(t, func(dir string) { + badgerDB := unittest.BadgerDB(t, filepath.Join(dir, "badger")) + defer func() { + require.NoError(t, badgerDB.Close()) + }() + + cache := pebble.NewCache(1 << 20) + defer cache.Unref() + // currently pebble is only used for registers + opts := DefaultPebbleOptions(cache, pebble.DefaultComparer) + pebbledb, err := pebble.Open(filepath.Join(dir, "pebble"), opts) + require.NoError(t, err) + defer pebbledb.Close() + + fn(badgerDB, pebbledb) + }) +} From fc774476f32d8d095cbc74faa33dd7bb07035a00 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Jun 2024 16:13:48 -0700 Subject: [PATCH 07/19] use pebble based chunk data pack storage --- cmd/execution_builder.go | 59 ++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 3841f70f5f5..ec7dbdbe9a2 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -13,7 +13,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" - badgerDB "github.com/dgraph-io/badger/v2" + "github.com/cockroachdb/pebble" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" badger "github.com/ipfs/go-ds-badger2" @@ -94,7 +94,6 @@ import ( storage "github.com/onflow/flow-go/storage/badger" "github.com/onflow/flow-go/storage/badger/procedure" storagepebble "github.com/onflow/flow-go/storage/pebble" - sutil "github.com/onflow/flow-go/storage/util" ) const ( @@ -691,28 +690,39 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error { return nil } -func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { - log := sutil.NewLogger(logger) - - opts := badgerDB. - DefaultOptions(dbPath). - WithKeepL0InMemory(true). - WithLogger(log). - - // the ValueLogFileSize option specifies how big the value of a - // key-value pair is allowed to be saved into badger. - // exceeding this limit, will fail with an error like this: - // could not store data: Value with size exceeded 1073741824 limit - // Maximum value size is 10G, needed by execution node - // TODO: finding a better max value for each node type - WithValueLogFileSize(256 << 23). - WithValueLogMaxEntries(100000) // Default is 1000000 - - db, err := badgerDB.Open(opts) +// func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { +// log := sutil.NewLogger(logger) +// +// opts := badgerDB. +// DefaultOptions(dbPath). +// WithKeepL0InMemory(true). +// WithLogger(log). +// +// // the ValueLogFileSize option specifies how big the value of a +// // key-value pair is allowed to be saved into badger. +// // exceeding this limit, will fail with an error like this: +// // could not store data: Value with size exceeded 1073741824 limit +// // Maximum value size is 10G, needed by execution node +// // TODO: finding a better max value for each node type +// WithValueLogFileSize(256 << 23). +// WithValueLogMaxEntries(100000) // Default is 1000000 +// +// db, err := badgerDB.Open(opts) +// if err != nil { +// return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err) +// } +// return db, nil +// } + +func openPebbleChunkDataPackDB(dbPath string, logger zerolog.Logger) (*pebble.DB, error) { + cache := pebble.NewCache(1 << 20) + defer cache.Unref() + opts := storagepebble.DefaultPebbleOptions(cache, pebble.DefaultComparer) + pebbledb, err := pebble.Open(dbPath, opts) if err != nil { - return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err) + return nil, fmt.Errorf("could not open chunk data pack pebble db at path %v: %w", dbPath, err) } - return db, nil + return pebbledb, nil } func (exeNode *ExecutionNode) LoadExecutionState( @@ -722,7 +732,7 @@ func (exeNode *ExecutionNode) LoadExecutionState( error, ) { - chunkDataPackDB, err := openChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger) + chunkDataPackDB, err := openPebbleChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger) if err != nil { return nil, err } @@ -732,7 +742,8 @@ func (exeNode *ExecutionNode) LoadExecutionState( } return nil }) - chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + // chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + chunkDataPacks := storagepebble.NewChunkDataPacks(chunkDataPackDB, node.Storage.Collections) // Needed for gRPC server, make sure to assign to main scoped vars exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB) From b511b0a6f1097139d138ad706c040cfb4ccd326c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 6 Jun 2024 15:44:48 -0700 Subject: [PATCH 08/19] tmp --- cmd/execution_builder.go | 17 +++-------------- storage/pebble/chunk_data_packs.go | 28 ++++++++++++++++++++++++++-- storage/pebble/open.go | 16 +++++++++++++++- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index ec7dbdbe9a2..5dcb36fe20f 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -13,7 +13,6 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/cockroachdb/pebble" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" badger "github.com/ipfs/go-ds-badger2" @@ -714,17 +713,6 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error { // return db, nil // } -func openPebbleChunkDataPackDB(dbPath string, logger zerolog.Logger) (*pebble.DB, error) { - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - opts := storagepebble.DefaultPebbleOptions(cache, pebble.DefaultComparer) - pebbledb, err := pebble.Open(dbPath, opts) - if err != nil { - return nil, fmt.Errorf("could not open chunk data pack pebble db at path %v: %w", dbPath, err) - } - return pebbledb, nil -} - func (exeNode *ExecutionNode) LoadExecutionState( node *NodeConfig, ) ( @@ -732,10 +720,11 @@ func (exeNode *ExecutionNode) LoadExecutionState( error, ) { - chunkDataPackDB, err := openPebbleChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger) + chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(exeNode.exeConf.chunkDataPackDir) if err != nil { - return nil, err + return nil, fmt.Errorf("could not open chunk data pack database: %w", err) } + exeNode.builder.ShutdownFunc(func() error { if err := chunkDataPackDB.Close(); err != nil { return fmt.Errorf("error closing chunk data pack database: %w", err) diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index 4573db65e79..e8e6e311db0 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -4,21 +4,45 @@ import ( "fmt" "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger" "github.com/vmihailenco/msgpack" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/storage/badger/transaction" ) type ChunkDataPacks struct { - db *pebble.DB - collections storage.Collections + db *pebble.DB + collections storage.Collections + byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] } var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil) func NewChunkDataPacks(db *pebble.DB, collections storage.Collections) *ChunkDataPacks { + + store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error { + return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val))) + } + + retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + var c storage.StoredChunkDataPack + err := operation.RetrieveChunkDataPack(key, &c)(tx) + return &c, err + } + } + + cache := newCache(collector, metrics.ResourceChunkDataPack, + withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), + withStore(store), + withRetrieve(retrieve), + ) + return &ChunkDataPacks{ db: db, collections: collections, diff --git a/storage/pebble/open.go b/storage/pebble/open.go index a0d7ea6c0d5..c17756949ca 100644 --- a/storage/pebble/open.go +++ b/storage/pebble/open.go @@ -12,6 +12,8 @@ import ( "github.com/onflow/flow-go/storage/pebble/registers" ) +const DefaultPebbleCacheSize = 1 << 20 + // NewBootstrappedRegistersWithPath initializes a new Registers instance with a pebble db // if the database is not initialized, it close the database and return storage.ErrNotBootstrapped func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error) { @@ -35,7 +37,7 @@ func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error // OpenRegisterPebbleDB opens the database func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { - cache := pebble.NewCache(1 << 20) + cache := pebble.NewCache(DefaultPebbleCacheSize) defer cache.Unref() // currently pebble is only used for registers opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) @@ -47,6 +49,18 @@ func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { return db, nil } +func OpenDefaultPebbleDB(dir string) (*pebble.DB, error) { + cache := pebble.NewCache(DefaultPebbleCacheSize) + defer cache.Unref() + opts := DefaultPebbleOptions(cache, pebble.DefaultComparer) + db, err := pebble.Open(dir, opts) + if err != nil { + return nil, fmt.Errorf("failed to open db: %w", err) + } + + return db, nil +} + // ReadHeightsFromBootstrappedDB reads the first and latest height from a bootstrapped register db // If the register db is not bootstrapped, it returns storage.ErrNotBootstrapped // If the register db is corrupted, it returns an error From 7efc768e95421a810a7a89ce2dd5a204d5e0f2c7 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Jun 2024 11:41:23 -0700 Subject: [PATCH 09/19] implement pebble chunk data pack with cache --- cmd/execution_builder.go | 2 +- storage/pebble/chunk_data_packs.go | 119 ++++++++++++------- storage/pebble/chunk_data_packs_test.go | 50 ++++++-- storage/pebble/value_cache.go | 150 ++++++++++++++++++++++++ 4 files changed, 265 insertions(+), 56 deletions(-) create mode 100644 storage/pebble/value_cache.go diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 5dcb36fe20f..10c34aa804b 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -732,7 +732,7 @@ func (exeNode *ExecutionNode) LoadExecutionState( return nil }) // chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) - chunkDataPacks := storagepebble.NewChunkDataPacks(chunkDataPackDB, node.Storage.Collections) + chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) // Needed for gRPC server, make sure to assign to main scoped vars exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB) diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index e8e6e311db0..0b200af893b 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -4,15 +4,13 @@ import ( "fmt" "github.com/cockroachdb/pebble" - "github.com/dgraph-io/badger" "github.com/vmihailenco/msgpack" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" ) type ChunkDataPacks struct { @@ -23,40 +21,39 @@ type ChunkDataPacks struct { var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil) -func NewChunkDataPacks(db *pebble.DB, collections storage.Collections) *ChunkDataPacks { +func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks { - store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val))) - } - - retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { - return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + retrieve := func(key flow.Identifier) func(pebble.Reader) (*storage.StoredChunkDataPack, error) { + return func(r pebble.Reader) (*storage.StoredChunkDataPack, error) { var c storage.StoredChunkDataPack - err := operation.RetrieveChunkDataPack(key, &c)(tx) + err := RetrieveChunkDataPack(key, &c)(r) return &c, err } } cache := newCache(collector, metrics.ResourceChunkDataPack, withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), - withStore(store), withRetrieve(retrieve), ) return &ChunkDataPacks{ - db: db, - collections: collections, + db: db, + collections: collections, + byChunkIDCache: cache, } } func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { batch := ch.db.NewBatch() defer batch.Close() + + scs := make([]*storage.StoredChunkDataPack, 0, len(cs)) for _, c := range cs { - err := ch.batchStore(c, batch) + sc, err := ch.batchStore(c, batch) if err != nil { return fmt.Errorf("cannot store chunk data pack: %w", err) } + scs = append(scs, sc) } err := batch.Commit(pebble.Sync) @@ -64,16 +61,39 @@ func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { return fmt.Errorf("cannot commit batch: %w", err) } + // TODO: move to batchStore + for _, sc := range scs { + ch.byChunkIDCache.Insert(sc.ChunkID, sc) + } + return nil } func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { + batch := ch.db.NewBatch() + + for _, c := range cs { + err := ch.batchRemove(c, batch) + if err != nil { + return fmt.Errorf("cannot remove chunk data pack: %w", err) + } + } + + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("cannot commit batch: %w", err) + } + + for _, c := range cs { + ch.byChunkIDCache.Remove(c) + } + return nil } func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { var sc storage.StoredChunkDataPack - err := RetrieveChunkDataPack(ch.db, chunkID, &sc) + err := RetrieveChunkDataPack(chunkID, &sc)(ch.db) if err != nil { return nil, fmt.Errorf("could not retrieve stored chunk data pack: %w", err) } @@ -97,50 +117,63 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac } func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.BatchStorage) error { - return nil + return fmt.Errorf("not implemented") } -func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *pebble.Batch) error { +func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error { + return batch.Delete(makeKey(codeChunkDataPack, chunkID), nil) +} + +func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *pebble.Batch) (*storage.StoredChunkDataPack, error) { sc := storage.ToStoredChunkDataPack(c) - return InsertChunkDataPack(batch, sc) + err := InsertChunkDataPack(sc)(batch) + if err != nil { + return nil, fmt.Errorf("failed to store chunk data pack: %w", err) + } + return sc, nil } -func InsertChunkDataPack(batch *pebble.Batch, sc *storage.StoredChunkDataPack) error { +// TODO: move to operation package +func InsertChunkDataPack(sc *storage.StoredChunkDataPack) func(w pebble.Writer) error { key := makeKey(codeChunkDataPack, sc.ChunkID) - return batchWrite(batch, key, sc) + return insert(key, sc) } -func RetrieveChunkDataPack(db *pebble.DB, chunkID flow.Identifier, sc *storage.StoredChunkDataPack) error { +func RetrieveChunkDataPack(chunkID flow.Identifier, sc *storage.StoredChunkDataPack) func(r pebble.Reader) error { key := makeKey(codeChunkDataPack, chunkID) - return retrieve(db, key, sc) + return retrieve(key, sc) } -func batchWrite(batch *pebble.Batch, key []byte, val interface{}) error { - value, err := msgpack.Marshal(val) - if err != nil { - return irrecoverable.NewExceptionf("failed to encode value: %w", err) - } +func insert(key []byte, val interface{}) func(pebble.Writer) error { + return func(w pebble.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } - err = batch.Set(key, value, nil) - if err != nil { - return irrecoverable.NewExceptionf("failed to store data: %w", err) - } + err = w.Set(key, value, nil) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } - return nil + return nil + } } -func retrieve(db *pebble.DB, key []byte, sc interface{}) error { - val, closer, err := db.Get(key) - if err != nil { - return convertNotFoundError(err) - } - defer closer.Close() +func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { + return func(r pebble.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return convertNotFoundError(err) + } + defer closer.Close() - err = msgpack.Unmarshal(val, &sc) - if err != nil { - return irrecoverable.NewExceptionf("failed to decode value: %w", err) + err = msgpack.Unmarshal(val, &sc) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + return nil } - return nil } const ( diff --git a/storage/pebble/chunk_data_packs_test.go b/storage/pebble/chunk_data_packs_test.go index 79d6c5bb908..f170b22114c 100644 --- a/storage/pebble/chunk_data_packs_test.go +++ b/storage/pebble/chunk_data_packs_test.go @@ -1,12 +1,12 @@ package pebble import ( + "errors" "path/filepath" "testing" "github.com/cockroachdb/pebble" "github.com/dgraph-io/badger/v2" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/vmihailenco/msgpack" @@ -35,20 +35,54 @@ func TestMsgPacks(t *testing.T) { // TestChunkDataPacks_Store evaluates correct storage and retrieval of chunk data packs in the storage. // It also evaluates that re-inserting is idempotent. func TestChunkDataPacks_Store(t *testing.T) { - WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore *ChunkDataPacks, _ *pebble.DB) { + WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore storage.ChunkDataPacks, _ *pebble.DB) { + // can store require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + + // can read back + for _, c := range chunkDataPacks { + c2, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.NoError(t, err) + require.Equal(t, c, c2) + } + + // can store again require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + + cids := make([]flow.Identifier, 0, len(chunkDataPacks)) + for i, c := range chunkDataPacks { + // remove everything except the first one + if i > 0 { + cids = append(cids, c.ChunkID) + } + } + // can remove + require.NoError(t, chunkDataPackStore.Remove(cids)) + for i, c := range chunkDataPacks { + if i == 0 { + // the first one is not removed + _, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.NoError(t, err) + continue + } + // the rest are removed + _, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + + // can remove again + require.NoError(t, chunkDataPackStore.Remove(cids)) }) } // WithChunkDataPacks is a test helper that generates specified number of chunk data packs, store them using the storeFunc, and // then evaluates whether they are successfully retrieved from storage. -func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, *ChunkDataPacks, *pebble.DB)) { +func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, storage.ChunkDataPacks, *pebble.DB)) { RunWithBadgerDBAndPebbleDB(t, func(badgerDB *badger.DB, db *pebble.DB) { transactions := badgerstorage.NewTransactions(&metrics.NoopCollector{}, badgerDB) collections := badgerstorage.NewCollections(badgerDB, transactions) // keep the cache size at 1 to make sure that entries are written and read from storage itself. - store := NewChunkDataPacks(db, collections) + store := NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) chunkDataPacks := unittest.ChunkDataPacksFixture(chunks) for _, chunkDataPack := range chunkDataPacks { @@ -59,14 +93,6 @@ func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []* // stores chunk data packs in the memory using provided store function. storeFunc(t, chunkDataPacks, store, db) - - // stored chunk data packs should be retrieved successfully. - for _, expected := range chunkDataPacks { - actual, err := store.ByChunkID(expected.ChunkID) - require.NoError(t, err) - - assert.Equal(t, expected, actual) - } }) } diff --git a/storage/pebble/value_cache.go b/storage/pebble/value_cache.go new file mode 100644 index 00000000000..64ca2c91a81 --- /dev/null +++ b/storage/pebble/value_cache.go @@ -0,0 +1,150 @@ +package pebble + +import ( + "errors" + "fmt" + + "github.com/cockroachdb/pebble" + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.limit = limit + } +} + +type storeFunc[K comparable, V any] func(key K, val V) func(pebble.Writer) error + +func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.store = store + } +} + +func noStore[K comparable, V any](_ K, _ V) func(pebble.Writer) error { + return func(pebble.Writer) error { + return fmt.Errorf("no store function for cache put available") + } +} + +func noopStore[K comparable, V any](_ K, _ V) func(pebble.Reader) error { + return func(pebble.Reader) error { + return nil + } +} + +type retrieveFunc[K comparable, V any] func(key K) func(pebble.Reader) (V, error) + +func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.retrieve = retrieve + } +} + +func noRetrieve[K comparable, V any](_ K) func(pebble.Reader) (V, error) { + return func(pebble.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type Cache[K comparable, V any] struct { + metrics module.CacheMetrics + limit uint + store storeFunc[K, V] + retrieve retrieveFunc[K, V] + resource string + cache *lru.Cache[K, V] +} + +func newCache[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*Cache[K, V])) *Cache[K, V] { + c := Cache[K, V]{ + metrics: collector, + limit: 1000, + store: noStore[K, V], + retrieve: noRetrieve[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *Cache[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *Cache[K, V]) Get(key K) func(pebble.Reader) (V, error) { + return func(r pebble.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(r) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *Cache[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +// assuming the resource has been added to storage already. +func (c *Cache[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *Cache[K, V]) PutTx(key K, resource V) func(pebble.Writer) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(w pebble.Writer) error { + // the storeOps must be sync operation + err := storeOps(w) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + c.Insert(key, resource) + + return nil + } +} From 86e6267666475ca12bac231c47bfe9a39591fceb Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Jun 2024 11:46:57 -0700 Subject: [PATCH 10/19] move modules to operations package --- storage/pebble/chunk_data_packs.go | 62 ++------------------ storage/pebble/operations/chunk_data_pack.go | 25 ++++++++ storage/pebble/operations/codes.go | 5 ++ storage/pebble/operations/common.go | 55 +++++++++++++++++ 4 files changed, 90 insertions(+), 57 deletions(-) create mode 100644 storage/pebble/operations/chunk_data_pack.go create mode 100644 storage/pebble/operations/codes.go create mode 100644 storage/pebble/operations/common.go diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index 0b200af893b..68632dc8ec7 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -4,13 +4,12 @@ import ( "fmt" "github.com/cockroachdb/pebble" - "github.com/vmihailenco/msgpack" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/pebble/operations" ) type ChunkDataPacks struct { @@ -26,7 +25,7 @@ func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections retrieve := func(key flow.Identifier) func(pebble.Reader) (*storage.StoredChunkDataPack, error) { return func(r pebble.Reader) (*storage.StoredChunkDataPack, error) { var c storage.StoredChunkDataPack - err := RetrieveChunkDataPack(key, &c)(r) + err := operations.RetrieveChunkDataPack(key, &c)(r) return &c, err } } @@ -93,7 +92,7 @@ func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { var sc storage.StoredChunkDataPack - err := RetrieveChunkDataPack(chunkID, &sc)(ch.db) + err := operations.RetrieveChunkDataPack(chunkID, &sc)(ch.db) if err != nil { return nil, fmt.Errorf("could not retrieve stored chunk data pack: %w", err) } @@ -121,65 +120,14 @@ func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat } func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error { - return batch.Delete(makeKey(codeChunkDataPack, chunkID), nil) + return operations.RemoveChunkDataPack(chunkID)(batch) } func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *pebble.Batch) (*storage.StoredChunkDataPack, error) { sc := storage.ToStoredChunkDataPack(c) - err := InsertChunkDataPack(sc)(batch) + err := operations.InsertChunkDataPack(sc)(batch) if err != nil { return nil, fmt.Errorf("failed to store chunk data pack: %w", err) } return sc, nil } - -// TODO: move to operation package -func InsertChunkDataPack(sc *storage.StoredChunkDataPack) func(w pebble.Writer) error { - key := makeKey(codeChunkDataPack, sc.ChunkID) - return insert(key, sc) -} - -func RetrieveChunkDataPack(chunkID flow.Identifier, sc *storage.StoredChunkDataPack) func(r pebble.Reader) error { - key := makeKey(codeChunkDataPack, chunkID) - return retrieve(key, sc) -} - -func insert(key []byte, val interface{}) func(pebble.Writer) error { - return func(w pebble.Writer) error { - value, err := msgpack.Marshal(val) - if err != nil { - return irrecoverable.NewExceptionf("failed to encode value: %w", err) - } - - err = w.Set(key, value, nil) - if err != nil { - return irrecoverable.NewExceptionf("failed to store data: %w", err) - } - - return nil - } -} - -func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { - return func(r pebble.Reader) error { - val, closer, err := r.Get(key) - if err != nil { - return convertNotFoundError(err) - } - defer closer.Close() - - err = msgpack.Unmarshal(val, &sc) - if err != nil { - return irrecoverable.NewExceptionf("failed to decode value: %w", err) - } - return nil - } -} - -const ( - codeChunkDataPack = 100 -) - -func makeKey(prefix byte, chunkID flow.Identifier) []byte { - return append([]byte{prefix}, chunkID[:]...) -} diff --git a/storage/pebble/operations/chunk_data_pack.go b/storage/pebble/operations/chunk_data_pack.go new file mode 100644 index 00000000000..e4ede3dd9e3 --- /dev/null +++ b/storage/pebble/operations/chunk_data_pack.go @@ -0,0 +1,25 @@ +package operations + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +func InsertChunkDataPack(sc *storage.StoredChunkDataPack) func(w pebble.Writer) error { + key := makeKey(codeChunkDataPack, sc.ChunkID) + return insert(key, sc) +} + +func RetrieveChunkDataPack(chunkID flow.Identifier, sc *storage.StoredChunkDataPack) func(r pebble.Reader) error { + key := makeKey(codeChunkDataPack, chunkID) + return retrieve(key, sc) +} + +func RemoveChunkDataPack(chunkID flow.Identifier) func(w pebble.Writer) error { + key := makeKey(codeChunkDataPack, chunkID) + return func(w pebble.Writer) error { + return w.Delete(key, nil) + } +} diff --git a/storage/pebble/operations/codes.go b/storage/pebble/operations/codes.go new file mode 100644 index 00000000000..0f803e5e0c0 --- /dev/null +++ b/storage/pebble/operations/codes.go @@ -0,0 +1,5 @@ +package operations + +const ( + codeChunkDataPack = 100 +) diff --git a/storage/pebble/operations/common.go b/storage/pebble/operations/common.go new file mode 100644 index 00000000000..a093ca459c7 --- /dev/null +++ b/storage/pebble/operations/common.go @@ -0,0 +1,55 @@ +package operations + +import ( + "errors" + + "github.com/cockroachdb/pebble" + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +func insert(key []byte, val interface{}) func(pebble.Writer) error { + return func(w pebble.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value, nil) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { + return func(r pebble.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return convertNotFoundError(err) + } + defer closer.Close() + + err = msgpack.Unmarshal(val, &sc) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + return nil + } +} + +func makeKey(prefix byte, identifier flow.Identifier) []byte { + return append([]byte{prefix}, identifier[:]...) +} + +func convertNotFoundError(err error) error { + if errors.Is(err, pebble.ErrNotFound) { + return storage.ErrNotFound + } + return err +} From 03fdb3924837fa73e282c680d96a950ad9cb535b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Jun 2024 12:57:59 -0700 Subject: [PATCH 11/19] extract batch --- storage/pebble/batch.go | 57 ++++++++++++++++++++++++++++++ storage/pebble/chunk_data_packs.go | 25 ++++++------- storage/pebble/value_cache.go | 22 ++++++------ 3 files changed, 78 insertions(+), 26 deletions(-) create mode 100644 storage/pebble/batch.go diff --git a/storage/pebble/batch.go b/storage/pebble/batch.go new file mode 100644 index 00000000000..e952d06c2d2 --- /dev/null +++ b/storage/pebble/batch.go @@ -0,0 +1,57 @@ +package pebble + +import ( + "sync" + + "github.com/cockroachdb/pebble" +) + +type Batch struct { + writer *pebble.Batch + + lock sync.RWMutex + callbacks []func() +} + +func NewBatch(db *pebble.DB) *Batch { + batch := db.NewBatch() + return &Batch{ + writer: batch, + callbacks: make([]func(), 0), + } +} + +func (b *Batch) GetWriter() *pebble.Batch { + return b.writer +} + +// OnSucceed adds a callback to execute after the batch has +// been successfully flushed. +// useful for implementing the cache where we will only cache +// after the batch has been successfully flushed +func (b *Batch) OnSucceed(callback func()) { + b.lock.Lock() + defer b.lock.Unlock() + b.callbacks = append(b.callbacks, callback) +} + +// Flush will call the badger Batch's Flush method, in +// addition, it will call the callbacks added by +// OnSucceed +func (b *Batch) Flush() error { + err := b.writer.Commit(nil) + if err != nil { + return err + } + + b.lock.RLock() + defer b.lock.RUnlock() + for _, callback := range b.callbacks { + callback() + } + return nil +} + +func (b *Batch) Close() error { + return b.writer.Close() +} diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index 68632dc8ec7..43e7e6cdc8b 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -43,28 +43,21 @@ func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections } func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { - batch := ch.db.NewBatch() + batch := NewBatch(ch.db) defer batch.Close() - scs := make([]*storage.StoredChunkDataPack, 0, len(cs)) for _, c := range cs { - sc, err := ch.batchStore(c, batch) + err := ch.batchStore(c, batch) if err != nil { return fmt.Errorf("cannot store chunk data pack: %w", err) } - scs = append(scs, sc) } - err := batch.Commit(pebble.Sync) + err := batch.Flush() if err != nil { return fmt.Errorf("cannot commit batch: %w", err) } - // TODO: move to batchStore - for _, sc := range scs { - ch.byChunkIDCache.Insert(sc.ChunkID, sc) - } - return nil } @@ -123,11 +116,15 @@ func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writ return operations.RemoveChunkDataPack(chunkID)(batch) } -func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *pebble.Batch) (*storage.StoredChunkDataPack, error) { +func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *Batch) error { sc := storage.ToStoredChunkDataPack(c) - err := operations.InsertChunkDataPack(sc)(batch) + writer := batch.GetWriter() + batch.OnSucceed(func() { + ch.byChunkIDCache.Insert(sc.ChunkID, sc) + }) + err := operations.InsertChunkDataPack(sc)(writer) if err != nil { - return nil, fmt.Errorf("failed to store chunk data pack: %w", err) + return fmt.Errorf("failed to store chunk data pack: %w", err) } - return sc, nil + return nil } diff --git a/storage/pebble/value_cache.go b/storage/pebble/value_cache.go index 64ca2c91a81..fdf2b9044c7 100644 --- a/storage/pebble/value_cache.go +++ b/storage/pebble/value_cache.go @@ -19,24 +19,22 @@ func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) { type storeFunc[K comparable, V any] func(key K, val V) func(pebble.Writer) error -func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { - return func(c *Cache[K, V]) { - c.store = store - } -} - +// func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { +// return func(c *Cache[K, V]) { +// c.store = store +// } +// } func noStore[K comparable, V any](_ K, _ V) func(pebble.Writer) error { return func(pebble.Writer) error { return fmt.Errorf("no store function for cache put available") } } -func noopStore[K comparable, V any](_ K, _ V) func(pebble.Reader) error { - return func(pebble.Reader) error { - return nil - } -} - +// func noopStore[K comparable, V any](_ K, _ V) func(pebble.Reader) error { +// return func(pebble.Reader) error { +// return nil +// } +// } type retrieveFunc[K comparable, V any] func(key K) func(pebble.Reader) (V, error) func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) { From ad06637cee71a5fe41f6c1121fe78238c24b2636 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Jun 2024 15:02:51 -0700 Subject: [PATCH 12/19] fix lint --- cmd/execution_builder.go | 48 +++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 10c34aa804b..0cac773bb43 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -13,6 +13,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" + badgerDB "github.com/dgraph-io/badger/v2" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" badger "github.com/ipfs/go-ds-badger2" @@ -93,6 +94,7 @@ import ( storage "github.com/onflow/flow-go/storage/badger" "github.com/onflow/flow-go/storage/badger/procedure" storagepebble "github.com/onflow/flow-go/storage/pebble" + sutil "github.com/onflow/flow-go/storage/util" ) const ( @@ -689,29 +691,29 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error { return nil } -// func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { -// log := sutil.NewLogger(logger) -// -// opts := badgerDB. -// DefaultOptions(dbPath). -// WithKeepL0InMemory(true). -// WithLogger(log). -// -// // the ValueLogFileSize option specifies how big the value of a -// // key-value pair is allowed to be saved into badger. -// // exceeding this limit, will fail with an error like this: -// // could not store data: Value with size exceeded 1073741824 limit -// // Maximum value size is 10G, needed by execution node -// // TODO: finding a better max value for each node type -// WithValueLogFileSize(256 << 23). -// WithValueLogMaxEntries(100000) // Default is 1000000 -// -// db, err := badgerDB.Open(opts) -// if err != nil { -// return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err) -// } -// return db, nil -// } +func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { + log := sutil.NewLogger(logger) + + opts := badgerDB. + DefaultOptions(dbPath). + WithKeepL0InMemory(true). + WithLogger(log). + + // the ValueLogFileSize option specifies how big the value of a + // key-value pair is allowed to be saved into badger. + // exceeding this limit, will fail with an error like this: + // could not store data: Value with size exceeded 1073741824 limit + // Maximum value size is 10G, needed by execution node + // TODO: finding a better max value for each node type + WithValueLogFileSize(256 << 23). + WithValueLogMaxEntries(100000) // Default is 1000000 + + db, err := badgerDB.Open(opts) + if err != nil { + return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err) + } + return db, nil +} func (exeNode *ExecutionNode) LoadExecutionState( node *NodeConfig, From 7e8d0d6bde956ed33a36bd6839ce8667759a573c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 10 Jun 2024 09:06:53 -0700 Subject: [PATCH 13/19] update mock --- storage/mock/chunk_data_packs.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/storage/mock/chunk_data_packs.go b/storage/mock/chunk_data_packs.go index 3fbacab10d8..2db2f6dbe63 100644 --- a/storage/mock/chunk_data_packs.go +++ b/storage/mock/chunk_data_packs.go @@ -32,24 +32,6 @@ func (_m *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat return r0 } -// BatchStore provides a mock function with given fields: c, batch -func (_m *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error { - ret := _m.Called(c, batch) - - if len(ret) == 0 { - panic("no return value specified for BatchStore") - } - - var r0 error - if rf, ok := ret.Get(0).(func(*flow.ChunkDataPack, storage.BatchStorage) error); ok { - r0 = rf(c, batch) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ByChunkID provides a mock function with given fields: chunkID func (_m *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { ret := _m.Called(chunkID) From 466f9a6e1cac5b3f8650cea8651aca4f1f181557 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 10 Jun 2024 09:30:10 -0700 Subject: [PATCH 14/19] fix chunk data pack codec --- model/flow/chunk.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/model/flow/chunk.go b/model/flow/chunk.go index 90fd760c0f4..2de03307a5c 100644 --- a/model/flow/chunk.go +++ b/model/flow/chunk.go @@ -235,6 +235,9 @@ func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error { // Helper function to convert a slice of cid.Cid to a slice of strings func cidsToStrings(cids []cid.Cid) []string { + if cids == nil { + return nil + } strs := make([]string, len(cids)) for i, c := range cids { strs[i] = c.String() @@ -244,6 +247,9 @@ func cidsToStrings(cids []cid.Cid) []string { // Helper function to convert a slice of strings to a slice of cid.Cid func stringsToCids(strs []string) []cid.Cid { + if strs == nil { + return nil + } cids := make([]cid.Cid, len(strs)) for i, s := range strs { c, err := cid.Decode(s) From cde5e81fa433856afaee118f8cda6e4115dd4eee Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 10 Jun 2024 16:35:24 -0700 Subject: [PATCH 15/19] add comments --- cmd/execution_builder.go | 6 ++++-- model/flow/chunk.go | 17 ++++++++++++----- storage/pebble/batch.go | 1 + storage/pebble/chunk_data_packs.go | 8 ++++++++ storage/pebble/open.go | 6 ++++++ storage/pebble/operations/chunk_data_pack.go | 6 ++++++ storage/pebble/value_cache.go | 4 ++++ 7 files changed, 41 insertions(+), 7 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 0cac773bb43..2f1795cd8e0 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -733,8 +733,10 @@ func (exeNode *ExecutionNode) LoadExecutionState( } return nil }) - // chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) - chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + // chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, + // chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache, + chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) // Needed for gRPC server, make sure to assign to main scoped vars exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB) diff --git a/model/flow/chunk.go b/model/flow/chunk.go index 2de03307a5c..83eabde4b1e 100644 --- a/model/flow/chunk.go +++ b/model/flow/chunk.go @@ -1,6 +1,7 @@ package flow import ( + "fmt" "log" "github.com/ipfs/go-cid" @@ -228,7 +229,13 @@ func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error { } b.BlockID = temp.BlockID - b.ChunkExecutionDataIDs = stringsToCids(temp.ChunkExecutionDataIDs) + cids, err := stringsToCids(temp.ChunkExecutionDataIDs) + + if err != nil { + return fmt.Errorf("failed to decode chunk execution data ids: %w", err) + } + + b.ChunkExecutionDataIDs = cids return nil } @@ -246,17 +253,17 @@ func cidsToStrings(cids []cid.Cid) []string { } // Helper function to convert a slice of strings to a slice of cid.Cid -func stringsToCids(strs []string) []cid.Cid { +func stringsToCids(strs []string) ([]cid.Cid, error) { if strs == nil { - return nil + return nil, nil } cids := make([]cid.Cid, len(strs)) for i, s := range strs { c, err := cid.Decode(s) if err != nil { - panic(err) // Handle error appropriately in real code + return nil, fmt.Errorf("failed to decode cid %v: %w", s, err) } cids[i] = c } - return cids + return cids, nil } diff --git a/storage/pebble/batch.go b/storage/pebble/batch.go index e952d06c2d2..9a45e55bc02 100644 --- a/storage/pebble/batch.go +++ b/storage/pebble/batch.go @@ -38,6 +38,7 @@ func (b *Batch) OnSucceed(callback func()) { // Flush will call the badger Batch's Flush method, in // addition, it will call the callbacks added by // OnSucceed +// any error are exceptions func (b *Batch) Flush() error { err := b.writer.Commit(nil) if err != nil { diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index 43e7e6cdc8b..73037e988ed 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -42,6 +42,8 @@ func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections } } +// Store stores the given chunk data pack lists, it stores them atomically. +// Any error are exceptions func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { batch := NewBatch(ch.db) defer batch.Close() @@ -61,6 +63,8 @@ func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { return nil } +// Remove removes chunk data packs by IDs, it removes them atomically. +// Any errors are exceptions func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { batch := ch.db.NewBatch() @@ -83,6 +87,9 @@ func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { return nil } +// ByChunkID finds the chunk data pack by chunk ID. +// it returns storage.ErrNotFound if not found +// other errors are exceptions func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { var sc storage.StoredChunkDataPack err := operations.RetrieveChunkDataPack(chunkID, &sc)(ch.db) @@ -108,6 +115,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac return chdp, nil } +// BatchRemove is not used in pebble implementation func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.BatchStorage) error { return fmt.Errorf("not implemented") } diff --git a/storage/pebble/open.go b/storage/pebble/open.go index c17756949ca..80f328ce87a 100644 --- a/storage/pebble/open.go +++ b/storage/pebble/open.go @@ -36,6 +36,10 @@ func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error } // OpenRegisterPebbleDB opens the database +// The difference between OpenDefaultPebbleDB is that it uses +// a customized comparer (NewMVCCComparer) which is needed to +// implement finding register values at any given height using +// pebble's SeekPrefixGE function func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { cache := pebble.NewCache(DefaultPebbleCacheSize) defer cache.Unref() @@ -49,6 +53,8 @@ func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { return db, nil } +// OpenDefaultPebbleDB opens a pebble database using default options, +// such as cache size and comparer func OpenDefaultPebbleDB(dir string) (*pebble.DB, error) { cache := pebble.NewCache(DefaultPebbleCacheSize) defer cache.Unref() diff --git a/storage/pebble/operations/chunk_data_pack.go b/storage/pebble/operations/chunk_data_pack.go index e4ede3dd9e3..7b93f72d61a 100644 --- a/storage/pebble/operations/chunk_data_pack.go +++ b/storage/pebble/operations/chunk_data_pack.go @@ -7,16 +7,22 @@ import ( "github.com/onflow/flow-go/storage" ) +// InsertChunkDataPack inserts a chunk data pack keyed by chunk ID. +// any error are exceptions func InsertChunkDataPack(sc *storage.StoredChunkDataPack) func(w pebble.Writer) error { key := makeKey(codeChunkDataPack, sc.ChunkID) return insert(key, sc) } +// RetrieveChunkDataPack retrieves a chunk data pack by chunk ID. +// it returns storage.ErrNotFound if the chunk data pack is not found func RetrieveChunkDataPack(chunkID flow.Identifier, sc *storage.StoredChunkDataPack) func(r pebble.Reader) error { key := makeKey(codeChunkDataPack, chunkID) return retrieve(key, sc) } +// RemoveChunkDataPack removes the chunk data pack with the given chunk ID. +// any error are exceptions func RemoveChunkDataPack(chunkID flow.Identifier) func(w pebble.Writer) error { key := makeKey(codeChunkDataPack, chunkID) return func(w pebble.Writer) error { diff --git a/storage/pebble/value_cache.go b/storage/pebble/value_cache.go index fdf2b9044c7..38f1f394910 100644 --- a/storage/pebble/value_cache.go +++ b/storage/pebble/value_cache.go @@ -50,6 +50,10 @@ func noRetrieve[K comparable, V any](_ K) func(pebble.Reader) (V, error) { } } +// Cache is a read-through cache for underlying storage layer. +// Note: when a resource is not found in the cache nor the underlying storage, then +// it will not be cached. In other words, finding the missing item again will +// query the underlying storage again. type Cache[K comparable, V any] struct { metrics module.CacheMetrics limit uint From 22daffdb72b8fbd1f7434d0e1aa44a1b5e909f13 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 11 Jun 2024 11:50:24 -0700 Subject: [PATCH 16/19] address review comments --- storage/pebble/chunk_data_packs.go | 18 ++++++++++++------ .../chunk_data_pack.go | 6 +++++- .../pebble/{operations => operation}/codes.go | 2 +- .../pebble/{operations => operation}/common.go | 7 +------ 4 files changed, 19 insertions(+), 14 deletions(-) rename storage/pebble/{operations => operation}/chunk_data_pack.go (88%) rename storage/pebble/{operations => operation}/codes.go (65%) rename storage/pebble/{operations => operation}/common.go (86%) diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go index 73037e988ed..c0b5b47eeab 100644 --- a/storage/pebble/chunk_data_packs.go +++ b/storage/pebble/chunk_data_packs.go @@ -4,12 +4,13 @@ import ( "fmt" "github.com/cockroachdb/pebble" + "github.com/rs/zerolog/log" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/pebble/operations" + "github.com/onflow/flow-go/storage/pebble/operation" ) type ChunkDataPacks struct { @@ -25,7 +26,7 @@ func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections retrieve := func(key flow.Identifier) func(pebble.Reader) (*storage.StoredChunkDataPack, error) { return func(r pebble.Reader) (*storage.StoredChunkDataPack, error) { var c storage.StoredChunkDataPack - err := operations.RetrieveChunkDataPack(key, &c)(r) + err := operation.RetrieveChunkDataPack(key, &c)(r) return &c, err } } @@ -46,7 +47,12 @@ func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections // Any error are exceptions func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { batch := NewBatch(ch.db) - defer batch.Close() + defer func() { + err := batch.Close() + if err != nil { + log.Error().Err(err).Msgf("failed to close batch when storing chunk data pack") + } + }() for _, c := range cs { err := ch.batchStore(c, batch) @@ -92,7 +98,7 @@ func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { // other errors are exceptions func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { var sc storage.StoredChunkDataPack - err := operations.RetrieveChunkDataPack(chunkID, &sc)(ch.db) + err := operation.RetrieveChunkDataPack(chunkID, &sc)(ch.db) if err != nil { return nil, fmt.Errorf("could not retrieve stored chunk data pack: %w", err) } @@ -121,7 +127,7 @@ func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat } func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error { - return operations.RemoveChunkDataPack(chunkID)(batch) + return operation.RemoveChunkDataPack(chunkID)(batch) } func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *Batch) error { @@ -130,7 +136,7 @@ func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *Batch) error batch.OnSucceed(func() { ch.byChunkIDCache.Insert(sc.ChunkID, sc) }) - err := operations.InsertChunkDataPack(sc)(writer) + err := operation.InsertChunkDataPack(sc)(writer) if err != nil { return fmt.Errorf("failed to store chunk data pack: %w", err) } diff --git a/storage/pebble/operations/chunk_data_pack.go b/storage/pebble/operation/chunk_data_pack.go similarity index 88% rename from storage/pebble/operations/chunk_data_pack.go rename to storage/pebble/operation/chunk_data_pack.go index 7b93f72d61a..f5cec13cdbe 100644 --- a/storage/pebble/operations/chunk_data_pack.go +++ b/storage/pebble/operation/chunk_data_pack.go @@ -1,4 +1,4 @@ -package operations +package operation import ( "github.com/cockroachdb/pebble" @@ -29,3 +29,7 @@ func RemoveChunkDataPack(chunkID flow.Identifier) func(w pebble.Writer) error { return w.Delete(key, nil) } } + +func makeKey(prefix byte, identifier flow.Identifier) []byte { + return append([]byte{prefix}, identifier[:]...) +} diff --git a/storage/pebble/operations/codes.go b/storage/pebble/operation/codes.go similarity index 65% rename from storage/pebble/operations/codes.go rename to storage/pebble/operation/codes.go index 0f803e5e0c0..1d9057646c3 100644 --- a/storage/pebble/operations/codes.go +++ b/storage/pebble/operation/codes.go @@ -1,4 +1,4 @@ -package operations +package operation const ( codeChunkDataPack = 100 diff --git a/storage/pebble/operations/common.go b/storage/pebble/operation/common.go similarity index 86% rename from storage/pebble/operations/common.go rename to storage/pebble/operation/common.go index a093ca459c7..ad9e96c2c8b 100644 --- a/storage/pebble/operations/common.go +++ b/storage/pebble/operation/common.go @@ -1,4 +1,4 @@ -package operations +package operation import ( "errors" @@ -6,7 +6,6 @@ import ( "github.com/cockroachdb/pebble" "github.com/vmihailenco/msgpack" - "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/storage" ) @@ -43,10 +42,6 @@ func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { } } -func makeKey(prefix byte, identifier flow.Identifier) []byte { - return append([]byte{prefix}, identifier[:]...) -} - func convertNotFoundError(err error) error { if errors.Is(err, pebble.ErrNotFound) { return storage.ErrNotFound From 6bd2e01996777d10c3b61c61bbd652d63776af62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 12 Jun 2024 09:55:57 -0700 Subject: [PATCH 17/19] Update to Cadence v1.0.0-preview.34 --- go.mod | 4 ++-- go.sum | 8 ++++---- insecure/go.mod | 4 ++-- insecure/go.sum | 8 ++++---- integration/go.mod | 4 ++-- integration/go.sum | 8 ++++---- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 6f22a64920c..b29c3361983 100644 --- a/go.mod +++ b/go.mod @@ -47,12 +47,12 @@ require ( github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/multiformats/go-multihash v0.2.3 github.com/onflow/atree v0.7.0-rc.2 - github.com/onflow/cadence v1.0.0-preview.33 + github.com/onflow/cadence v1.0.0-preview.34 github.com/onflow/crypto v0.25.1 github.com/onflow/flow v0.3.4 github.com/onflow/flow-core-contracts/lib/go/contracts v1.1.0 github.com/onflow/flow-core-contracts/lib/go/templates v1.0.0 - github.com/onflow/flow-go-sdk v1.0.0-preview.35 + github.com/onflow/flow-go-sdk v1.0.0-preview.36 github.com/onflow/flow/protobuf/go/flow v0.4.4 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible diff --git a/go.sum b/go.sum index 0883c06396a..c0ec089e15a 100644 --- a/go.sum +++ b/go.sum @@ -2171,8 +2171,8 @@ github.com/onflow/atree v0.7.0-rc.2/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483 h1:LpiQhTAfM9CAmNVEs0n//cBBgCg+vJSiIxTHYUklZ84= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= github.com/onflow/cadence v1.0.0-M3/go.mod h1:odXGZZ/wGNA5mwT8bC9v8u8EXACHllB2ABSZK65TGL8= -github.com/onflow/cadence v1.0.0-preview.33 h1:kqkU+9//PRsyL3SMokeK2mStarZVxiwrGypyiOX/On8= -github.com/onflow/cadence v1.0.0-preview.33/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= +github.com/onflow/cadence v1.0.0-preview.34 h1:MJSli75W6LJVUqSx/tq4MQe64H1+EcQBD/sNgpOO4jE= +github.com/onflow/cadence v1.0.0-preview.34/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1 h1:0txy2PKPMM873JbpxQNbJmuOJtD56bfs48RQfm0ts5A= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= @@ -2187,8 +2187,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/ github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs= github.com/onflow/flow-ft/lib/go/templates v1.0.0/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= github.com/onflow/flow-go-sdk v1.0.0-M1/go.mod h1:TDW0MNuCs4SvqYRUzkbRnRmHQL1h4X8wURsCw9P9beo= -github.com/onflow/flow-go-sdk v1.0.0-preview.35 h1:2ptBhFYFGOaYghZTRbj51BbYqTZjkyEpXDyaWDYrHwA= -github.com/onflow/flow-go-sdk v1.0.0-preview.35/go.mod h1:/G8vtAekhvgynLYVDtd6OnhixoGTzzknmhYCJB2YWWU= +github.com/onflow/flow-go-sdk v1.0.0-preview.36 h1:3g72MjmZPEEVAbtDATbjwqKoNSB7yHLWswUHSAB5zwQ= +github.com/onflow/flow-go-sdk v1.0.0-preview.36/go.mod h1:mjkXIluC+kseYyd8Z1aTq73IiffAUeoY5fuX/C2Z+1w= github.com/onflow/flow-nft/lib/go/contracts v1.2.1 h1:woAAS5z651sDpi7ihAHll8NvRS9uFXIXkL6xR+bKFZY= github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkpBPlggIRkI53Suo0n2AyA2HcE= github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= diff --git a/insecure/go.mod b/insecure/go.mod index d4f03241831..534c198e208 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -199,12 +199,12 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onflow/atree v0.7.0-rc.2 // indirect - github.com/onflow/cadence v1.0.0-preview.33 // indirect + github.com/onflow/cadence v1.0.0-preview.34 // indirect github.com/onflow/flow-core-contracts/lib/go/contracts v1.1.0 // indirect github.com/onflow/flow-core-contracts/lib/go/templates v1.0.0 // indirect github.com/onflow/flow-ft/lib/go/contracts v1.0.0 // indirect github.com/onflow/flow-ft/lib/go/templates v1.0.0 // indirect - github.com/onflow/flow-go-sdk v1.0.0-preview.35 // indirect + github.com/onflow/flow-go-sdk v1.0.0-preview.36 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect github.com/onflow/flow/protobuf/go/flow v0.4.4 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index fec8cd1bfb7..f97bb10c543 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2160,8 +2160,8 @@ github.com/onflow/atree v0.6.1-0.20230711151834-86040b30171f/go.mod h1:xvP61FoOs github.com/onflow/atree v0.7.0-rc.2 h1:mZmVrl/zPlfI44EjV3FdR2QwIqT8nz1sCONUBFcML/U= github.com/onflow/atree v0.7.0-rc.2/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ0tHf4DRuM= github.com/onflow/cadence v1.0.0-M3/go.mod h1:odXGZZ/wGNA5mwT8bC9v8u8EXACHllB2ABSZK65TGL8= -github.com/onflow/cadence v1.0.0-preview.33 h1:kqkU+9//PRsyL3SMokeK2mStarZVxiwrGypyiOX/On8= -github.com/onflow/cadence v1.0.0-preview.33/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= +github.com/onflow/cadence v1.0.0-preview.34 h1:MJSli75W6LJVUqSx/tq4MQe64H1+EcQBD/sNgpOO4jE= +github.com/onflow/cadence v1.0.0-preview.34/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1 h1:0txy2PKPMM873JbpxQNbJmuOJtD56bfs48RQfm0ts5A= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= @@ -2174,8 +2174,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/ github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs= github.com/onflow/flow-ft/lib/go/templates v1.0.0/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= github.com/onflow/flow-go-sdk v1.0.0-M1/go.mod h1:TDW0MNuCs4SvqYRUzkbRnRmHQL1h4X8wURsCw9P9beo= -github.com/onflow/flow-go-sdk v1.0.0-preview.35 h1:2ptBhFYFGOaYghZTRbj51BbYqTZjkyEpXDyaWDYrHwA= -github.com/onflow/flow-go-sdk v1.0.0-preview.35/go.mod h1:/G8vtAekhvgynLYVDtd6OnhixoGTzzknmhYCJB2YWWU= +github.com/onflow/flow-go-sdk v1.0.0-preview.36 h1:3g72MjmZPEEVAbtDATbjwqKoNSB7yHLWswUHSAB5zwQ= +github.com/onflow/flow-go-sdk v1.0.0-preview.36/go.mod h1:mjkXIluC+kseYyd8Z1aTq73IiffAUeoY5fuX/C2Z+1w= github.com/onflow/flow-nft/lib/go/contracts v1.2.1 h1:woAAS5z651sDpi7ihAHll8NvRS9uFXIXkL6xR+bKFZY= github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkpBPlggIRkI53Suo0n2AyA2HcE= github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= diff --git a/integration/go.mod b/integration/go.mod index 138d2e0374c..3a706343122 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -19,13 +19,13 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger2 v0.1.3 github.com/libp2p/go-libp2p v0.32.2 - github.com/onflow/cadence v1.0.0-preview.33 + github.com/onflow/cadence v1.0.0-preview.34 github.com/onflow/crypto v0.25.1 github.com/onflow/flow-core-contracts/lib/go/contracts v1.1.0 github.com/onflow/flow-core-contracts/lib/go/templates v1.0.0 github.com/onflow/flow-emulator v1.0.0-preview.24 github.com/onflow/flow-go v0.35.5-0.20240517202625-55f862b45dfd - github.com/onflow/flow-go-sdk v1.0.0-preview.35 + github.com/onflow/flow-go-sdk v1.0.0-preview.36 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 github.com/onflow/flow/protobuf/go/flow v0.4.4 github.com/onflow/go-ethereum v1.13.4 diff --git a/integration/go.sum b/integration/go.sum index 310bad73ae6..b264c2122a4 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2150,8 +2150,8 @@ github.com/onflow/atree v0.6.1-0.20230711151834-86040b30171f/go.mod h1:xvP61FoOs github.com/onflow/atree v0.7.0-rc.2 h1:mZmVrl/zPlfI44EjV3FdR2QwIqT8nz1sCONUBFcML/U= github.com/onflow/atree v0.7.0-rc.2/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ0tHf4DRuM= github.com/onflow/cadence v1.0.0-M3/go.mod h1:odXGZZ/wGNA5mwT8bC9v8u8EXACHllB2ABSZK65TGL8= -github.com/onflow/cadence v1.0.0-preview.33 h1:kqkU+9//PRsyL3SMokeK2mStarZVxiwrGypyiOX/On8= -github.com/onflow/cadence v1.0.0-preview.33/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= +github.com/onflow/cadence v1.0.0-preview.34 h1:MJSli75W6LJVUqSx/tq4MQe64H1+EcQBD/sNgpOO4jE= +github.com/onflow/cadence v1.0.0-preview.34/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1 h1:0txy2PKPMM873JbpxQNbJmuOJtD56bfs48RQfm0ts5A= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= @@ -2166,8 +2166,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/ github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs= github.com/onflow/flow-ft/lib/go/templates v1.0.0/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= github.com/onflow/flow-go-sdk v1.0.0-M1/go.mod h1:TDW0MNuCs4SvqYRUzkbRnRmHQL1h4X8wURsCw9P9beo= -github.com/onflow/flow-go-sdk v1.0.0-preview.35 h1:2ptBhFYFGOaYghZTRbj51BbYqTZjkyEpXDyaWDYrHwA= -github.com/onflow/flow-go-sdk v1.0.0-preview.35/go.mod h1:/G8vtAekhvgynLYVDtd6OnhixoGTzzknmhYCJB2YWWU= +github.com/onflow/flow-go-sdk v1.0.0-preview.36 h1:3g72MjmZPEEVAbtDATbjwqKoNSB7yHLWswUHSAB5zwQ= +github.com/onflow/flow-go-sdk v1.0.0-preview.36/go.mod h1:mjkXIluC+kseYyd8Z1aTq73IiffAUeoY5fuX/C2Z+1w= github.com/onflow/flow-nft/lib/go/contracts v1.2.1 h1:woAAS5z651sDpi7ihAHll8NvRS9uFXIXkL6xR+bKFZY= github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkpBPlggIRkI53Suo0n2AyA2HcE= github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= From 053e77ccfd828e4592a164c569c7bab2540e9555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 12 Jun 2024 10:27:37 -0700 Subject: [PATCH 18/19] adjust tests: add new visits, account for them in metrics --- .../cadence_values_migration_test.go | 64 +++++++++++++++++++ .../migration_matrics_collector_test.go | 4 +- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/cmd/util/ledger/migrations/cadence_values_migration_test.go b/cmd/util/ledger/migrations/cadence_values_migration_test.go index f63d2fa99e5..882952b9362 100644 --- a/cmd/util/ledger/migrations/cadence_values_migration_test.go +++ b/cmd/util/ledger/migrations/cadence_values_migration_test.go @@ -457,6 +457,70 @@ func checkMigratedState( storageMapKey: interpreter.Uint64StorageMapKey(0x3), value: "StorageCapabilityController(borrowType: Type<&A.0ae53cb6e3f42a79.FlowToken.Vault>(), capabilityID: 3, target: /storage/flowTokenVault)", }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("flowTokenVault"), + value: "3", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("flowTokenVault"), + value: "1", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("flowTokenVault"), + value: "nil", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("flowTokenVault"), + value: "nil", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("flowTokenVault"), + value: "{3: nil, 1: nil}", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("r"), + value: "2", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("r"), + value: "nil", + }, + { + storageKey: interpreter.StorageKey{ + Key: "path_cap", + Address: address, + }, + storageMapKey: interpreter.StringStorageMapKey("r"), + value: "{2: nil}", + }, }, visitMigration.visits, ) diff --git a/cmd/util/ledger/migrations/migration_matrics_collector_test.go b/cmd/util/ledger/migrations/migration_matrics_collector_test.go index 1bd70a82739..29afafb19ef 100644 --- a/cmd/util/ledger/migrations/migration_matrics_collector_test.go +++ b/cmd/util/ledger/migrations/migration_matrics_collector_test.go @@ -87,7 +87,7 @@ func TestMigrationMetricsCollection(t *testing.T) { require.Equal( t, Metrics{ - TotalValues: 752, + TotalValues: 789, TotalErrors: 6, ErrorsPerContract: map[string]int{ "A.01cf0e2f2f715450.Test": 6, @@ -187,7 +187,7 @@ func TestMigrationMetricsCollection(t *testing.T) { require.Equal( t, Metrics{ - TotalValues: 752, + TotalValues: 789, TotalErrors: 6, ErrorsPerContract: map[string]int{ "A.01cf0e2f2f715450.Test": 6, From b86ca1c87b0a63b0a41a9a8260bb5f4efc14c65b Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Wed, 12 Jun 2024 10:41:09 -0700 Subject: [PATCH 19/19] [CI] Remove cruise control overrides in Access integration tests --- integration/tests/access/cohort3/grpc_state_stream_test.go | 1 - integration/tests/access/cohort3/grpc_streaming_blocks_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/integration/tests/access/cohort3/grpc_state_stream_test.go b/integration/tests/access/cohort3/grpc_state_stream_test.go index d4f77b9245d..2f2d883bb1e 100644 --- a/integration/tests/access/cohort3/grpc_state_stream_test.go +++ b/integration/tests/access/cohort3/grpc_state_stream_test.go @@ -114,7 +114,6 @@ func (s *GrpcStateStreamSuite) SetupTest() { testnet.AsGhost()) consensusConfigs := []func(config *testnet.NodeConfig){ - testnet.WithAdditionalFlag("--cruise-ctl-fallback-proposal-duration=400ms"), testnet.WithAdditionalFlag(fmt.Sprintf("--required-verification-seal-approvals=%d", 1)), testnet.WithAdditionalFlag(fmt.Sprintf("--required-construction-seal-approvals=%d", 1)), testnet.WithLogLevel(zerolog.FatalLevel), diff --git a/integration/tests/access/cohort3/grpc_streaming_blocks_test.go b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go index 1cc6139676b..96c7655406a 100644 --- a/integration/tests/access/cohort3/grpc_streaming_blocks_test.go +++ b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go @@ -74,7 +74,6 @@ func (s *GrpcBlocksStreamSuite) SetupTest() { ) consensusConfigs := []func(config *testnet.NodeConfig){ - testnet.WithAdditionalFlag("--cruise-ctl-fallback-proposal-duration=400ms"), testnet.WithAdditionalFlag(fmt.Sprintf("--required-verification-seal-approvals=%d", 1)), testnet.WithAdditionalFlag(fmt.Sprintf("--required-construction-seal-approvals=%d", 1)), testnet.WithLogLevel(zerolog.FatalLevel),