Skip to content

Commit

Permalink
Plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rian committed Aug 14, 2024
1 parent dc46ab3 commit c08159a
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 5 deletions.
21 changes: 20 additions & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,26 @@ func (b *Blockchain) RevertHead() error {
return b.database.Update(b.revertHead)
}

func (b *Blockchain) GetReverseStateDiff() (*core.StateDiff, error) {
var reverseStateDiff *core.StateDiff
return reverseStateDiff, b.database.View(func(txn db.Transaction) error {
blockNumber, err := chainHeight(txn)
if err != nil {
return err
}
stateUpdate, err := stateUpdateByNumber(txn, blockNumber)
if err != nil {
return err
}
state := core.NewState(txn)
reverseStateDiff, err = state.BuildReverseDiff(blockNumber, stateUpdate.StateDiff)
if err != nil {
return err
}
return nil
})
}

func (b *Blockchain) revertHead(txn db.Transaction) error {
blockNumber, err := chainHeight(txn)
if err != nil {
Expand Down Expand Up @@ -874,7 +894,6 @@ func (b *Blockchain) revertHead(txn db.Transaction) error {
}

// Revert chain height and pending.

if genesisBlock {
if err = txn.Delete(db.Pending.Key()); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions cmd/juno/juno.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
callMaxStepsF = "rpc-call-max-steps"
corsEnableF = "rpc-cors-enable"
versionedConstantsFileF = "versioned-constants-file"
pluginPathF = "plugin-path"

defaultConfig = ""
defaulHost = "localhost"
Expand Down Expand Up @@ -119,6 +120,7 @@ const (
defaultGwTimeout = 5 * time.Second
defaultCorsEnable = false
defaultVersionedConstantsFile = ""
defaultPluginPath = ""

configFlagUsage = "The YAML configuration file."
logLevelFlagUsage = "Options: trace, debug, info, warn, error."
Expand Down Expand Up @@ -170,6 +172,7 @@ const (
"The upper limit is 4 million steps, and any higher value will still be capped at 4 million."
corsEnableUsage = "Enable CORS on RPC endpoints"
versionedConstantsFileUsage = "Use custom versioned constants from provided file"
pluginPathUsage = "Path to the plugins .so file"
)

var Version string
Expand Down Expand Up @@ -355,6 +358,7 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr
junoCmd.Flags().Bool(corsEnableF, defaultCorsEnable, corsEnableUsage)
junoCmd.Flags().String(versionedConstantsFileF, defaultVersionedConstantsFile, versionedConstantsFileUsage)
junoCmd.MarkFlagsMutuallyExclusive(p2pFeederNodeF, p2pPeersF)
junoCmd.Flags().String(pluginPathF, defaultPluginPath, pluginPathUsage)

junoCmd.AddCommand(GenP2PKeyPair(), DBCmd(defaultDBPath))

Expand Down
4 changes: 2 additions & 2 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (s *State) Revert(blockNumber uint64, update *StateUpdate) error {
}

// update contracts
reversedDiff, err := s.buildReverseDiff(blockNumber, update.StateDiff)
reversedDiff, err := s.BuildReverseDiff(blockNumber, update.StateDiff)
if err != nil {
return fmt.Errorf("build reverse diff: %v", err)
}
Expand Down Expand Up @@ -660,7 +660,7 @@ func (s *State) purgeContract(addr *felt.Felt) error {
return storageCloser()
}

func (s *State) buildReverseDiff(blockNumber uint64, diff *StateDiff) (*StateDiff, error) {
func (s *State) BuildReverseDiff(blockNumber uint64, diff *StateDiff) (*StateDiff, error) {
reversed := *diff

// storage diffs
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/ethereum/go-ethereum v1.14.7
github.com/fxamacker/cbor/v2 v2.7.0
github.com/go-playground/validator/v10 v10.22.0
github.com/golang/protobuf v1.5.4
github.com/jinzhu/copier v0.4.0
github.com/libp2p/go-libp2p v0.34.0
github.com/libp2p/go-libp2p-kad-dht v0.25.2
Expand Down
2 changes: 1 addition & 1 deletion grpc/gen/kv_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/NethermindEth/juno/l1"
"github.com/NethermindEth/juno/migration"
"github.com/NethermindEth/juno/p2p"
junoplugin "github.com/NethermindEth/juno/plugin"
"github.com/NethermindEth/juno/rpc"
"github.com/NethermindEth/juno/service"
adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder"
Expand Down Expand Up @@ -87,6 +88,8 @@ type Config struct {

GatewayAPIKey string `mapstructure:"gw-api-key"`
GatewayTimeout time.Duration `mapstructure:"gw-timeout"`

PluginPath string `mapstructure:"plugin-path"`
}

type Node struct {
Expand Down Expand Up @@ -155,6 +158,14 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote)
gatewayClient := gateway.NewClient(cfg.Network.GatewayURL, log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey)

if cfg.PluginPath != "" {
plugin, err := junoplugin.Load(cfg.PluginPath)
if err != nil {
return nil, err
}
synchronizer.WithPlugin(plugin)
}

var p2pService *p2p.Service
if cfg.P2P {
if cfg.Network != utils.Sepolia {
Expand Down
43 changes: 43 additions & 0 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package junoplugin

import (
"fmt"
"plugin"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
)

//go:generate mockgen -destination=../mocks/mock_plugin.go -package=mocks github.com/NethermindEth/juno/plugin JunoPlugin
type JunoPlugin interface {
Init() error
Shutdown() error // Todo: Currently this function will never be called.
NewBlock(block *core.Block, stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class) error
// The state is reverted by applying a write operation with the reverseStateDiff's StorageDiffs, Nonces, and ReplacedClasses,
// and a delete option with its DeclaredV0Classes, DeclaredV1Classes, and ReplacedClasses.
RevertBlock(from, to *BlockAndStateUpdate, reverseStateDiff *core.StateDiff) error
}

type BlockAndStateUpdate struct {
Block *core.Block
StateUpdate *core.StateUpdate
}

func Load(pluginPath string) (JunoPlugin, error) {
plug, err := plugin.Open(pluginPath)
if err != nil {
return nil, fmt.Errorf("error loading plugin .so file: %w", err)
}

symPlugin, err := plug.Lookup("JunoPluginInstance")
if err != nil {
return nil, fmt.Errorf("error looking up PluginInstance: %w", err)
}

pluginInstance, ok := symPlugin.(JunoPlugin)
if !ok {
return nil, fmt.Errorf("the plugin does not staisfy the required interface")
}

return pluginInstance, pluginInstance.Init()
}
51 changes: 50 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/feed"
junoplugin "github.com/NethermindEth/juno/plugin"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/starknetdata"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Synchronizer struct {

pendingPollInterval time.Duration
catchUpMode bool
plugin *junoplugin.JunoPlugin
}

func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
Expand All @@ -89,6 +91,12 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
return s
}

// WithPlugin registers an plugin
func (s *Synchronizer) WithPlugin(plugin junoplugin.JunoPlugin) *Synchronizer {
s.plugin = &plugin
return s
}

// WithListener registers an EventListener
func (s *Synchronizer) WithListener(listener EventListener) *Synchronizer {
s.listener = listener
Expand Down Expand Up @@ -180,6 +188,32 @@ func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *cor
return newClasses, closer()
}

func (s *Synchronizer) handlePluginRevertBlock(block *core.Block, stateUpdate *core.StateUpdate, reverseStateDiff *core.StateDiff) {
if s.plugin == nil {
return
}

toBlock, err := s.blockchain.Head()
if err != nil {
s.log.Warnw("Failed to retrieve the reverted blockchain head block for the plugin", "err", err)
return
}

toSU, err := s.blockchain.StateUpdateByNumber(toBlock.Number)
if err != nil {
s.log.Warnw("Failed to retrieve the reverted blockchain head state-update for the plugin", "err", err)
return
}

err = (*s.plugin).RevertBlock(
&junoplugin.BlockAndStateUpdate{Block: block, StateUpdate: stateUpdate},
&junoplugin.BlockAndStateUpdate{Block: toBlock, StateUpdate: toSU},
reverseStateDiff)
if err != nil {
s.log.Errorw("Plugin RevertBlock failure:", "err", err)
}
}

func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stateUpdate *core.StateUpdate,
newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc,
) stream.Callback {
Expand All @@ -205,7 +239,14 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
// revert the head and restart the sync process, hoping that the reorg is not deep
// if the reorg is deeper, we will end up here again and again until we fully revert reorged
// blocks
reverseStateDiff, err := s.blockchain.GetReverseStateDiff()
if err != nil {
s.log.Warnw("Failed to retrieve reverse state diff", "head", block.Number, "hash", block.Hash.ShortString(), "err", err)
}
s.revertHead(block)
if s.plugin != nil {
s.handlePluginRevertBlock(block, stateUpdate, reverseStateDiff)
}
} else {
s.log.Warnw("Failed storing Block", "number", block.Number,
"hash", block.Hash.ShortString(), "err", err)
Expand All @@ -231,6 +272,12 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
s.newHeads.Send(block.Header)
s.log.Infow("Stored Block", "number", block.Number, "hash",
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
if s.plugin != nil {
err := (*s.plugin).NewBlock(block, stateUpdate, newClasses)
if err != nil {
s.log.Errorw("Plugin NewBlock failure:", err)
}
}
}
}
}
Expand Down Expand Up @@ -317,7 +364,9 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) {
}

s.log.Infow("Reorg detected", "localHead", localHead, "forkHead", forkBlock.Hash)

if err != nil {
s.log.Warnw("Failed getting reverse state-diff, err: ", err)
}
err = s.blockchain.RevertHead()
if err != nil {
s.log.Warnw("Failed reverting HEAD", "reverted", localHead, "err", err)
Expand Down

0 comments on commit c08159a

Please sign in to comment.