Skip to content

Commit

Permalink
Merge pull request #5787 from onflow/v0.33-new-ingestion-engine
Browse files Browse the repository at this point in the history
v0.33 add new execution ingestion engine
  • Loading branch information
zhangchiqing authored May 2, 2024
2 parents 9491f76 + cccee8f commit d75160c
Show file tree
Hide file tree
Showing 18 changed files with 2,192 additions and 29 deletions.
30 changes: 25 additions & 5 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ type ExecutionNode struct {
followerEng *followereng.ComplianceEngine // to sync blocks from consensus nodes
computationManager *computation.Manager
collectionRequester *requester.Engine
ingestionEng *ingestion.Engine
scriptsEng *scripts.Engine
followerDistributor *pubsub.FollowerDistributor
checkAuthorizedAtBlock func(blockID flow.Identifier) (bool, error)
Expand Down Expand Up @@ -1096,14 +1095,35 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
}

fetcher := fetcher.NewCollectionFetcher(node.Logger, exeNode.collectionRequester, node.State, exeNode.exeConf.onflowOnlyLNs)
if exeNode.exeConf.enableNewIngestionEngine {
_, core, err := ingestion.NewMachine(
node.Logger,
node.ProtocolEvents,
exeNode.collectionRequester,
fetcher,
node.Storage.Headers,
node.Storage.Blocks,
node.Storage.Collections,
exeNode.executionState,
node.State,
exeNode.collector,
exeNode.computationManager,
exeNode.providerEngine,
exeNode.blockDataUploader,
exeNode.stopControl,
)

return core, err
}

var blockLoader ingestion.BlockLoader
if exeNode.exeConf.enableStorehouse {
blockLoader = loader.NewUnfinalizedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
} else {
blockLoader = loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
}

exeNode.ingestionEng, err = ingestion.New(
ingestionEng, err := ingestion.New(
exeNode.ingestionUnit,
node.Logger,
node.EngineRegistry,
Expand All @@ -1126,11 +1146,11 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(

// TODO: we should solve these mutual dependencies better
// => https://github.com/dapperlabs/flow-go/issues/4360
exeNode.collectionRequester = exeNode.collectionRequester.WithHandle(exeNode.ingestionEng.OnCollection)
exeNode.collectionRequester.WithHandle(ingestionEng.OnCollection)

node.ProtocolEvents.AddConsumer(exeNode.ingestionEng)
node.ProtocolEvents.AddConsumer(ingestionEng)

return exeNode.ingestionEng, err
return ingestionEng, err
}

// create scripts engine for handling script execution
Expand Down
8 changes: 5 additions & 3 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ type ExecutionConfig struct {
// It works around an issue where some collection nodes are not configured with enough
// this works around an issue where some collection nodes are not configured with enough
// file descriptors causing connection failures.
onflowOnlyLNs bool
enableStorehouse bool
enableChecker bool
onflowOnlyLNs bool
enableStorehouse bool
enableChecker bool
enableNewIngestionEngine bool
}

func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
Expand Down Expand Up @@ -120,6 +121,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")
flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true")
flags.BoolVar(&exeConf.enableNewIngestionEngine, "enable-new-ingestion-engine", false, "enable new ingestion engine, default is false")

}

Expand Down
3 changes: 1 addition & 2 deletions engine/common/requester/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@ func New(log zerolog.Logger, metrics module.EngineMetrics, net network.EngineReg
// function. It is done in a separate call so that the requester can be injected
// into engines upon construction, and then provide a handle function to the
// requester from that engine itself.
func (e *Engine) WithHandle(handle HandleFunc) *Engine {
func (e *Engine) WithHandle(handle HandleFunc) {
e.handle = handle
return e
}

// Ready returns a ready channel that is closed once the engine has fully
Expand Down
Loading

0 comments on commit d75160c

Please sign in to comment.