From c08159aad32106662b445d3dc8791138a9c0470d Mon Sep 17 00:00:00 2001 From: rian Date: Fri, 9 Aug 2024 15:42:03 +0300 Subject: [PATCH] Plugin --- blockchain/blockchain.go | 21 ++++++++++++++++- cmd/juno/juno.go | 4 ++++ core/state.go | 4 ++-- go.mod | 1 + grpc/gen/kv_grpc.pb.go | 2 +- node/node.go | 11 +++++++++ plugin/plugin.go | 43 +++++++++++++++++++++++++++++++++ sync/sync.go | 51 +++++++++++++++++++++++++++++++++++++++- 8 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 plugin/plugin.go diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index ee2411c471..7f0d4024ac 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -828,6 +828,26 @@ func (b *Blockchain) RevertHead() error { return b.database.Update(b.revertHead) } +func (b *Blockchain) GetReverseStateDiff() (*core.StateDiff, error) { + var reverseStateDiff *core.StateDiff + return reverseStateDiff, b.database.View(func(txn db.Transaction) error { + blockNumber, err := chainHeight(txn) + if err != nil { + return err + } + stateUpdate, err := stateUpdateByNumber(txn, blockNumber) + if err != nil { + return err + } + state := core.NewState(txn) + reverseStateDiff, err = state.BuildReverseDiff(blockNumber, stateUpdate.StateDiff) + if err != nil { + return err + } + return nil + }) +} + func (b *Blockchain) revertHead(txn db.Transaction) error { blockNumber, err := chainHeight(txn) if err != nil { @@ -874,7 +894,6 @@ func (b *Blockchain) revertHead(txn db.Transaction) error { } // Revert chain height and pending. - if genesisBlock { if err = txn.Delete(db.Pending.Key()); err != nil { return err diff --git a/cmd/juno/juno.go b/cmd/juno/juno.go index c61ee40529..1c8e046196 100644 --- a/cmd/juno/juno.go +++ b/cmd/juno/juno.go @@ -82,6 +82,7 @@ const ( callMaxStepsF = "rpc-call-max-steps" corsEnableF = "rpc-cors-enable" versionedConstantsFileF = "versioned-constants-file" + pluginPathF = "plugin-path" defaultConfig = "" defaulHost = "localhost" @@ -119,6 +120,7 @@ const ( defaultGwTimeout = 5 * time.Second defaultCorsEnable = false defaultVersionedConstantsFile = "" + defaultPluginPath = "" configFlagUsage = "The YAML configuration file." logLevelFlagUsage = "Options: trace, debug, info, warn, error." @@ -170,6 +172,7 @@ const ( "The upper limit is 4 million steps, and any higher value will still be capped at 4 million." corsEnableUsage = "Enable CORS on RPC endpoints" versionedConstantsFileUsage = "Use custom versioned constants from provided file" + pluginPathUsage = "Path to the plugins .so file" ) var Version string @@ -355,6 +358,7 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr junoCmd.Flags().Bool(corsEnableF, defaultCorsEnable, corsEnableUsage) junoCmd.Flags().String(versionedConstantsFileF, defaultVersionedConstantsFile, versionedConstantsFileUsage) junoCmd.MarkFlagsMutuallyExclusive(p2pFeederNodeF, p2pPeersF) + junoCmd.Flags().String(pluginPathF, defaultPluginPath, pluginPathUsage) junoCmd.AddCommand(GenP2PKeyPair(), DBCmd(defaultDBPath)) diff --git a/core/state.go b/core/state.go index 1518ca49ce..6b38c963ab 100644 --- a/core/state.go +++ b/core/state.go @@ -544,7 +544,7 @@ func (s *State) Revert(blockNumber uint64, update *StateUpdate) error { } // update contracts - reversedDiff, err := s.buildReverseDiff(blockNumber, update.StateDiff) + reversedDiff, err := s.BuildReverseDiff(blockNumber, update.StateDiff) if err != nil { return fmt.Errorf("build reverse diff: %v", err) } @@ -660,7 +660,7 @@ func (s *State) purgeContract(addr *felt.Felt) error { return storageCloser() } -func (s *State) buildReverseDiff(blockNumber uint64, diff *StateDiff) (*StateDiff, error) { +func (s *State) BuildReverseDiff(blockNumber uint64, diff *StateDiff) (*StateDiff, error) { reversed := *diff // storage diffs diff --git a/go.mod b/go.mod index 365de44244..47203fe597 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/ethereum/go-ethereum v1.14.7 github.com/fxamacker/cbor/v2 v2.7.0 github.com/go-playground/validator/v10 v10.22.0 + github.com/golang/protobuf v1.5.4 github.com/jinzhu/copier v0.4.0 github.com/libp2p/go-libp2p v0.34.0 github.com/libp2p/go-libp2p-kad-dht v0.25.2 diff --git a/grpc/gen/kv_grpc.pb.go b/grpc/gen/kv_grpc.pb.go index d64c885b5c..b398effbb7 100644 --- a/grpc/gen/kv_grpc.pb.go +++ b/grpc/gen/kv_grpc.pb.go @@ -11,7 +11,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" + emptypb "github.com/golang/protobuf/ptypes/empty" ) // This is a compile-time assertion to ensure that this generated file diff --git a/node/node.go b/node/node.go index 1d61267b35..b4c77eebcf 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,7 @@ import ( "github.com/NethermindEth/juno/l1" "github.com/NethermindEth/juno/migration" "github.com/NethermindEth/juno/p2p" + junoplugin "github.com/NethermindEth/juno/plugin" "github.com/NethermindEth/juno/rpc" "github.com/NethermindEth/juno/service" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" @@ -87,6 +88,8 @@ type Config struct { GatewayAPIKey string `mapstructure:"gw-api-key"` GatewayTimeout time.Duration `mapstructure:"gw-timeout"` + + PluginPath string `mapstructure:"plugin-path"` } type Node struct { @@ -155,6 +158,14 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote) gatewayClient := gateway.NewClient(cfg.Network.GatewayURL, log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey) + if cfg.PluginPath != "" { + plugin, err := junoplugin.Load(cfg.PluginPath) + if err != nil { + return nil, err + } + synchronizer.WithPlugin(plugin) + } + var p2pService *p2p.Service if cfg.P2P { if cfg.Network != utils.Sepolia { diff --git a/plugin/plugin.go b/plugin/plugin.go new file mode 100644 index 0000000000..5ccf984e2d --- /dev/null +++ b/plugin/plugin.go @@ -0,0 +1,43 @@ +package junoplugin + +import ( + "fmt" + "plugin" + + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" +) + +//go:generate mockgen -destination=../mocks/mock_plugin.go -package=mocks github.com/NethermindEth/juno/plugin JunoPlugin +type JunoPlugin interface { + Init() error + Shutdown() error // Todo: Currently this function will never be called. + NewBlock(block *core.Block, stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class) error + // The state is reverted by applying a write operation with the reverseStateDiff's StorageDiffs, Nonces, and ReplacedClasses, + // and a delete option with its DeclaredV0Classes, DeclaredV1Classes, and ReplacedClasses. + RevertBlock(from, to *BlockAndStateUpdate, reverseStateDiff *core.StateDiff) error +} + +type BlockAndStateUpdate struct { + Block *core.Block + StateUpdate *core.StateUpdate +} + +func Load(pluginPath string) (JunoPlugin, error) { + plug, err := plugin.Open(pluginPath) + if err != nil { + return nil, fmt.Errorf("error loading plugin .so file: %w", err) + } + + symPlugin, err := plug.Lookup("JunoPluginInstance") + if err != nil { + return nil, fmt.Errorf("error looking up PluginInstance: %w", err) + } + + pluginInstance, ok := symPlugin.(JunoPlugin) + if !ok { + return nil, fmt.Errorf("the plugin does not staisfy the required interface") + } + + return pluginInstance, pluginInstance.Init() +} diff --git a/sync/sync.go b/sync/sync.go index a2e4ac0bf0..fb067b9110 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -12,6 +12,7 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/feed" + junoplugin "github.com/NethermindEth/juno/plugin" "github.com/NethermindEth/juno/service" "github.com/NethermindEth/juno/starknetdata" "github.com/NethermindEth/juno/utils" @@ -72,6 +73,7 @@ type Synchronizer struct { pendingPollInterval time.Duration catchUpMode bool + plugin *junoplugin.JunoPlugin } func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, @@ -89,6 +91,12 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, return s } +// WithPlugin registers an plugin +func (s *Synchronizer) WithPlugin(plugin junoplugin.JunoPlugin) *Synchronizer { + s.plugin = &plugin + return s +} + // WithListener registers an EventListener func (s *Synchronizer) WithListener(listener EventListener) *Synchronizer { s.listener = listener @@ -180,6 +188,32 @@ func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *cor return newClasses, closer() } +func (s *Synchronizer) handlePluginRevertBlock(block *core.Block, stateUpdate *core.StateUpdate, reverseStateDiff *core.StateDiff) { + if s.plugin == nil { + return + } + + toBlock, err := s.blockchain.Head() + if err != nil { + s.log.Warnw("Failed to retrieve the reverted blockchain head block for the plugin", "err", err) + return + } + + toSU, err := s.blockchain.StateUpdateByNumber(toBlock.Number) + if err != nil { + s.log.Warnw("Failed to retrieve the reverted blockchain head state-update for the plugin", "err", err) + return + } + + err = (*s.plugin).RevertBlock( + &junoplugin.BlockAndStateUpdate{Block: block, StateUpdate: stateUpdate}, + &junoplugin.BlockAndStateUpdate{Block: toBlock, StateUpdate: toSU}, + reverseStateDiff) + if err != nil { + s.log.Errorw("Plugin RevertBlock failure:", "err", err) + } +} + func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc, ) stream.Callback { @@ -205,7 +239,14 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat // revert the head and restart the sync process, hoping that the reorg is not deep // if the reorg is deeper, we will end up here again and again until we fully revert reorged // blocks + reverseStateDiff, err := s.blockchain.GetReverseStateDiff() + if err != nil { + s.log.Warnw("Failed to retrieve reverse state diff", "head", block.Number, "hash", block.Hash.ShortString(), "err", err) + } s.revertHead(block) + if s.plugin != nil { + s.handlePluginRevertBlock(block, stateUpdate, reverseStateDiff) + } } else { s.log.Warnw("Failed storing Block", "number", block.Number, "hash", block.Hash.ShortString(), "err", err) @@ -231,6 +272,12 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat s.newHeads.Send(block.Header) s.log.Infow("Stored Block", "number", block.Number, "hash", block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString()) + if s.plugin != nil { + err := (*s.plugin).NewBlock(block, stateUpdate, newClasses) + if err != nil { + s.log.Errorw("Plugin NewBlock failure:", err) + } + } } } } @@ -317,7 +364,9 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) { } s.log.Infow("Reorg detected", "localHead", localHead, "forkHead", forkBlock.Hash) - + if err != nil { + s.log.Warnw("Failed getting reverse state-diff, err: ", err) + } err = s.blockchain.RevertHead() if err != nil { s.log.Warnw("Failed reverting HEAD", "reverted", localHead, "err", err)