Skip to content

Commit

Permalink
Parallelize
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil committed Nov 20, 2024
1 parent 9b2feef commit 2126ab9
Showing 1 changed file with 86 additions and 48 deletions.
134 changes: 86 additions & 48 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"maps"
"runtime"
"sync"
"time"

"github.com/NethermindEth/juno/adapters/sn2core"
"github.com/NethermindEth/juno/blockchain"
Expand Down Expand Up @@ -769,76 +768,115 @@ var sepoliaBlockHashes []byte

const first0132SepoliaBlock = 86311

func updatePre0132Blocks(txn db.Transaction, network *utils.Network) error {
// TODO: remove when possible
if network.Name != utils.Sepolia.Name {
return nil
}
type commitmentsParams struct {
blockNumber uint64
block *core.Block
stateDiff *core.StateDiff
}

timeToSetBytes := time.Duration(0)
timeToStoreP2PHash := time.Duration(0)
type commitmentsResult struct {
blockNumber uint64
commitments *core.BlockCommitments
}

func storeP2PHashes(txn db.Transaction) error {
p2pHash := new(felt.Felt)
var start time.Time
for blockNumber := uint64(0); blockNumber < first0132SepoliaBlock; blockNumber++ {
offset := blockNumber * 32
start = time.Now()
p2pHash.SetBytes(sepoliaBlockHashes[offset : offset+32])
timeToSetBytes += time.Since(start)
start = time.Now()
if err := blockchain.StoreP2PHash(txn, blockNumber, p2pHash); err != nil {
return err
}
timeToStoreP2PHash += time.Since(start)
}
return nil
}

func fetchBlockAndStateUpdate(txn db.Transaction, blockNumber uint64) (commitmentsParams, error) {
block, err := blockchain.BlockByNumber(txn, blockNumber)
if err != nil {
return commitmentsParams{}, err
}

stateUpdate, err := blockchain.StateUpdateByNumber(txn, blockNumber)
if err != nil {
return commitmentsParams{}, err
}

fmt.Printf("Time to set bytes: %v\n", timeToSetBytes)
fmt.Printf("Time to store p2p hash: %v\n", timeToStoreP2PHash)
return commitmentsParams{blockNumber, block, stateUpdate.StateDiff}, nil
}

timeToGetBlock := time.Duration(0)
timeToGetStateUpdate := time.Duration(0)
timeToPost0132Hash := time.Duration(0)
timeToStoreBlockCommitments := time.Duration(0)
// Migration to store p2p hashes and update commitments for pre-0132 blocks
func updatePre0132Blocks(txn db.Transaction, network *utils.Network) error {
if network.Name != utils.Sepolia.Name {
return nil
}

for blockNumber := uint64(0); blockNumber < first0132SepoliaBlock; blockNumber++ {
start = time.Now()
block, err := blockchain.BlockByNumber(txn, blockNumber)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
break
if err := storeP2PHashes(txn); err != nil {
return err
}

workers := runtime.GOMAXPROCS(0)
cp := make(chan commitmentsParams, workers)
cr := make(chan commitmentsResult, workers)
errCh := make(chan error, 1)
defer close(errCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
defer close(cp)
for blockNumber := uint64(0); blockNumber < first0132SepoliaBlock; blockNumber++ {
if ctx.Err() != nil {
return
}
return err
params, err := fetchBlockAndStateUpdate(txn, blockNumber)
if err != nil {
if !errors.Is(err, db.ErrKeyNotFound) {
errCh <- err
cancel()
}
return
}
cp <- params
}
timeToGetBlock += time.Since(start)
}()

start = time.Now()
stateUpdate, err := blockchain.StateUpdateByNumber(txn, blockNumber)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
break
wg := &sync.WaitGroup{}
wg.Add(workers)
for range workers {
go func() {
defer wg.Done()
for params := range cp {
if ctx.Err() != nil {
return
}
_, commitments, err := core.Post0132Hash(params.block, params.stateDiff)
if err != nil {
errCh <- err
cancel()
return
}
cr <- commitmentsResult{params.blockNumber, commitments}
}
return err
}
timeToGetStateUpdate += time.Since(start)
}()
}

start = time.Now()
_, commitments, err := core.Post0132Hash(block, stateUpdate.StateDiff)
if err != nil {
return err
}
timeToPost0132Hash += time.Since(start)
go func() {
wg.Wait()
close(cr)
}()

start = time.Now()
if err := blockchain.StoreBlockCommitments(txn, blockNumber, commitments); err != nil {
for result := range cr {
if err := blockchain.StoreBlockCommitments(txn, result.blockNumber, result.commitments); err != nil {
return err
}
timeToStoreBlockCommitments += time.Since(start)
}

fmt.Printf("Time to get block: %v\n", timeToGetBlock)
fmt.Printf("Time to get state update: %v\n", timeToGetStateUpdate)
fmt.Printf("Time to post 0132 hash: %v\n", timeToPost0132Hash)
fmt.Printf("Time to store block commitments: %v\n", timeToStoreBlockCommitments)
select {
case err := <-errCh:
return fmt.Errorf("error while updating pre-0132 blocks: %w", err)
default:
}

return nil
}

0 comments on commit 2126ab9

Please sign in to comment.