Skip to content

Commit

Permalink
Merge pull request #6087 from onflow/bastian/update-atree-inlining-ca…
Browse files Browse the repository at this point in the history
…dence-v1.0-9
  • Loading branch information
turbolent authored Jun 12, 2024
2 parents a9275e2 + 5ea310f commit 3e88278
Show file tree
Hide file tree
Showing 34 changed files with 1,226 additions and 214 deletions.
12 changes: 8 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
return nil
}

func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
log := sutil.NewLogger(logger)

opts := badgerDB.
Expand Down Expand Up @@ -722,17 +722,21 @@ func (exeNode *ExecutionNode) LoadExecutionState(
error,
) {

chunkDataPackDB, err := openChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger)
chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(exeNode.exeConf.chunkDataPackDir)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not open chunk data pack database: %w", err)
}

exeNode.builder.ShutdownFunc(func() error {
if err := chunkDataPackDB.Close(); err != nil {
return fmt.Errorf("error closing chunk data pack database: %w", err)
}
return nil
})
chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
// chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache,
// chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache,
chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
Expand Down
23 changes: 10 additions & 13 deletions engine/execution/computation/computer/computer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/epochs"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/provider"
Expand Down Expand Up @@ -155,13 +156,10 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
Times(2) // 1 collection + system collection

exemetrics.On("ExecutionTransactionExecuted",
mock.Anything, // duration
mock.Anything, // conflict retry count
mock.Anything, // computation used
mock.Anything, // memory used
mock.Anything, // number of events
mock.Anything, // size of events
false). // no failure
mock.Anything,
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
return !arg.Failed // only successful transactions
})).
Return(nil).
Times(2 + 1) // 2 txs in collection + system chunk tx

Expand Down Expand Up @@ -1267,12 +1265,11 @@ func Test_ExecutingSystemCollection(t *testing.T) {

metrics.On("ExecutionTransactionExecuted",
mock.Anything, // duration
mock.Anything, // conflict retry count
mock.Anything, // computation used
mock.Anything, // memory used
expectedNumberOfEvents,
expectedEventSize,
false).
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
return arg.EventCounts == expectedNumberOfEvents &&
arg.EventSize == expectedEventSize &&
!arg.Failed
})).
Return(nil).
Times(1) // system chunk tx

Expand Down
69 changes: 41 additions & 28 deletions engine/execution/computation/computer/result_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ type resultCollector struct {
spockSignatures []crypto.Signature

blockStartTime time.Time
blockStats module.ExecutionResultStats
blockStats module.BlockExecutionResultStats
blockMeter *meter.Meter

currentCollectionStartTime time.Time
currentCollectionState *state.ExecutionState
currentCollectionStats module.ExecutionResultStats
currentCollectionStats module.CollectionExecutionResultStats
currentCollectionStorageSnapshot execution.ExtendableStorageSnapshot
}

Expand Down Expand Up @@ -123,9 +123,7 @@ func newResultCollector(
blockMeter: meter.NewMeter(meter.DefaultParameters()),
currentCollectionStartTime: now,
currentCollectionState: state.NewExecutionState(nil, state.DefaultParameters()),
currentCollectionStats: module.ExecutionResultStats{
NumberOfCollections: 1,
},
currentCollectionStats: module.CollectionExecutionResultStats{},
currentCollectionStorageSnapshot: storehouse.NewExecutingBlockSnapshot(
previousBlockSnapshot,
*block.StartState,
Expand Down Expand Up @@ -201,27 +199,16 @@ func (collector *resultCollector) commitCollection(

collector.spockSignatures = append(collector.spockSignatures, spock)

collector.currentCollectionStats.EventCounts = len(events)
collector.currentCollectionStats.EventSize = events.ByteSize()
collector.currentCollectionStats.NumberOfRegistersTouched = len(
collectionExecutionSnapshot.AllRegisterIDs())
for _, entry := range collectionExecutionSnapshot.UpdatedRegisters() {
collector.currentCollectionStats.NumberOfBytesWrittenToRegisters += len(
entry.Value)
}

collector.metrics.ExecutionCollectionExecuted(
time.Since(startTime),
collector.currentCollectionStats)

collector.blockStats.Merge(collector.currentCollectionStats)
collector.blockStats.Add(collector.currentCollectionStats)
collector.blockMeter.MergeMeter(collectionExecutionSnapshot.Meter)

collector.currentCollectionStartTime = time.Now()
collector.currentCollectionState = state.NewExecutionState(nil, state.DefaultParameters())
collector.currentCollectionStats = module.ExecutionResultStats{
NumberOfCollections: 1,
}
collector.currentCollectionStats = module.CollectionExecutionResultStats{}

for _, consumer := range collector.consumers {
err = consumer.OnExecutedCollection(collector.result.CollectionExecutionResultAt(collection.collectionIndex))
Expand Down Expand Up @@ -269,14 +256,12 @@ func (collector *resultCollector) processTransactionResult(
logger.Info().Msg("transaction executed successfully")
}

collector.metrics.ExecutionTransactionExecuted(
collector.handleTransactionExecutionMetrics(
timeSpent,
output,
txnExecutionSnapshot,
txn,
numConflictRetries,
output.ComputationUsed,
output.MemoryEstimate,
len(output.Events),
flow.EventsList(output.Events).ByteSize(),
output.Err != nil,
)

txnResult := flow.TransactionResult{
Expand All @@ -302,10 +287,6 @@ func (collector *resultCollector) processTransactionResult(
return fmt.Errorf("failed to merge into collection view: %w", err)
}

collector.currentCollectionStats.ComputationUsed += output.ComputationUsed
collector.currentCollectionStats.MemoryUsed += output.MemoryEstimate
collector.currentCollectionStats.NumberOfTransactions += 1

if !txn.lastTransactionInCollection {
return nil
}
Expand All @@ -316,6 +297,38 @@ func (collector *resultCollector) processTransactionResult(
collector.currentCollectionState.Finalize())
}

func (collector *resultCollector) handleTransactionExecutionMetrics(
timeSpent time.Duration,
output fvm.ProcedureOutput,
txnExecutionSnapshot *snapshot.ExecutionSnapshot,
txn TransactionRequest,
numConflictRetries int,
) {
transactionExecutionStats := module.TransactionExecutionResultStats{
ExecutionResultStats: module.ExecutionResultStats{
ComputationUsed: output.ComputationUsed,
MemoryUsed: output.MemoryEstimate,
EventCounts: len(output.Events),
EventSize: output.Events.ByteSize(),
NumberOfRegistersTouched: len(txnExecutionSnapshot.AllRegisterIDs()),
},
ComputationIntensities: output.ComputationIntensities,
NumberOfTxnConflictRetries: numConflictRetries,
Failed: output.Err != nil,
SystemTransaction: txn.isSystemTransaction,
}
for _, entry := range txnExecutionSnapshot.UpdatedRegisters() {
transactionExecutionStats.NumberOfBytesWrittenToRegisters += len(entry.Value)
}

collector.metrics.ExecutionTransactionExecuted(
timeSpent,
transactionExecutionStats,
)

collector.currentCollectionStats.Add(transactionExecutionStats)
}

func (collector *resultCollector) AddTransactionResult(
request TransactionRequest,
snapshot *snapshot.ExecutionSnapshot,
Expand Down
13 changes: 12 additions & 1 deletion fvm/evm/emulator/emulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,17 @@ func (bl *BlockView) DryRunTransaction(
msg.SkipAccountChecks = true

// return without commiting the state
return proc.run(msg, tx.Hash(), 0, tx.Type())
txResult, err := proc.run(msg, tx.Hash(), 0, tx.Type())
if txResult.Successful() {
// Adding `gethParams.SstoreSentryGasEIP2200` is needed for this condition:
// https://github.com/onflow/go-ethereum/blob/master/core/vm/operations_acl.go#L29-L32
txResult.GasConsumed += gethParams.SstoreSentryGasEIP2200
// Take into account any gas refunds, which are calculated only after
// transaction execution.
txResult.GasConsumed += txResult.GasRefund
}

return txResult, err
}

func (bl *BlockView) newProcedure() (*procedure, error) {
Expand Down Expand Up @@ -522,6 +532,7 @@ func (proc *procedure) run(
// if prechecks are passed, the exec result won't be nil
if execResult != nil {
res.GasConsumed = execResult.UsedGas
res.GasRefund = proc.state.GetRefund()
res.Index = uint16(txIndex)
// we need to capture the returned value no matter the status
// if the tx is reverted the error message is returned as returned value
Expand Down
Loading

0 comments on commit 3e88278

Please sign in to comment.