Skip to content

Commit

Permalink
PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Jul 1, 2024
1 parent 23c189e commit dbb1122
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 94 deletions.
117 changes: 56 additions & 61 deletions localbridgesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"math/big"
"time"

"github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge"
"github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2"
Expand All @@ -18,53 +19,60 @@ var (
claimEventSignaturePreEtrog = crypto.Keccak256Hash([]byte("ClaimEvent(uint32,uint32,address,address,uint256)"))
)

type EthClienter interface {
ethereum.LogFilterer
ethereum.BlockNumberReader
ethereum.ChainReader
}

type downloader struct {
downloadedCh chan batch
downloadedCh chan block
bridgeAddr common.Address
bridgeContractV2 *polygonzkevmbridgev2.Polygonzkevmbridgev2
bridgeContractV1 *polygonzkevmbridge.Polygonzkevmbridge
ethClient ethereum.LogFilterer
batchToBlocks map[uint64]struct{ from, to uint64 }
ethClient EthClienter
blockToBlocks map[uint64]struct{ from, to uint64 }
syncBlockChunkSize uint64
}

func newDownloader() (*downloader, error) {
return nil, errors.New("not implemented")
}

func (d *downloader) download(ctx context.Context, fromBatchNum uint64) {
fromBlock, err := d.getFirstBlockOfBatch(fromBatchNum)
if err != nil {
// TODO: handle error
return
}
currentBatch := batch{
BatchNum: fromBatchNum,
}
func (d *downloader) download(ctx context.Context, fromBlock uint64, downloadedCh chan block) {
d.downloadedCh = downloadedCh
for {
// get last block
// to block = min(currentBlock+d.syncBlockChunkSize, last block)
select {
case <-ctx.Done():
close(downloadedCh)
return
default:
}
lastBlock, err := d.ethClient.BlockNumber(ctx)
if err != nil {
// TODO: handle error
return
}
toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}
if fromBlock == toBlock {
time.Sleep(time.Millisecond * 100) // sleep 100ms for the L2 to produce more blocks
continue
}
blocks, err := d.getEventsByBlockRange(ctx, fromBlock, toBlock)
if err != nil {
// TODO: handle error
return
}

// get batch num using: zkevm_batchNumberByBlockNumber
// if batch is greater than currentBatch.BatchNum:
// send currentBatch over downloadedCh, then create new current batch
for _, b := range blocks {
d.downloadedCh <- b
}
fromBlock = toBlock
}
}

type block struct {
num uint64
hash common.Hash
bridges []Bridge
claims []Claim
}

func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) ([]block, error) {
blocks := []block{}
query := ethereum.FilterQuery{
Expand All @@ -84,12 +92,16 @@ func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlo
}
for _, l := range logs {
lastBlock := blocks[len(blocks)-1]
if lastBlock.num < l.BlockNumber {
if lastBlock.Num < l.BlockNumber {
blocks = append(blocks, block{
num: l.BlockNumber,
hash: l.BlockHash,
claims: []Claim{},
bridges: []Bridge{},
blockHeader: blockHeader{
Num: l.BlockNumber,
Hash: l.BlockHash,
},
Events: bridgeEvents{
Claims: []Claim{},
Bridges: []Bridge{},
},
})
}
switch l.Topics[0] {
Expand All @@ -98,7 +110,7 @@ func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlo
if err != nil {
return nil, err
}
blocks[len(blocks)-1].bridges = append(blocks[len(blocks)-1].bridges, Bridge{
blocks[len(blocks)-1].Events.Bridges = append(blocks[len(blocks)-1].Events.Bridges, Bridge{
LeafType: bridge.LeafType,
OriginNetwork: bridge.OriginNetwork,
OriginAddress: bridge.OriginAddress,
Expand All @@ -113,7 +125,7 @@ func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlo
if err != nil {
return nil, err
}
blocks[len(blocks)-1].claims = append(blocks[len(blocks)-1].claims, Claim{
blocks[len(blocks)-1].Events.Claims = append(blocks[len(blocks)-1].Events.Claims, Claim{
GlobalIndex: claim.GlobalIndex,
OriginNetwork: claim.OriginNetwork,
OriginAddress: claim.OriginAddress,
Expand All @@ -125,8 +137,9 @@ func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlo
if err != nil {
return nil, err
}
blocks[len(blocks)-1].claims = append(blocks[len(blocks)-1].claims, Claim{
blocks[len(blocks)-1].Events.Claims = append(blocks[len(blocks)-1].Events.Claims, Claim{
// WARNING: is it safe to convert Index --> GlobalIndex???
// according to Jesus, yes!
GlobalIndex: big.NewInt(int64(claim.Index)),
OriginNetwork: claim.OriginNetwork,
OriginAddress: claim.OriginAddress,
Expand All @@ -141,32 +154,14 @@ func (d *downloader) getEventsByBlockRange(ctx context.Context, fromBlock, toBlo
return blocks, nil
}

func (d *downloader) getFirstBlockOfBatch(batchNum uint64) (uint64, error) {
// how to get first blokc associated to batch num??? --> zkevm_getBatchByNumber
/*
"result": {
"name": "Batch",
"value": {
"number": "0x1",
"coinbase": "0x0000000000000000000000000000000000000001",
"stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000001",
"globalExitRoot": "0x0000000000000000000000000000000000000000000000000000000000000002",
"mainnetExitRoot": "0x0000000000000000000000000000000000000000000000000000000000000003",
"rollupExitRoot": "0x0000000000000000000000000000000000000000000000000000000000000004",
"localExitRoot": "0x0000000000000000000000000000000000000000000000000000000000000005",
"accInputHash": "0x0000000000000000000000000000000000000000000000000000000000000006",
"timestamp": "0x642af31f",
"sendSequencesTxHash": "0x0000000000000000000000000000000000000000000000000000000000000007",
"verifyBatchTxHash": "0x0000000000000000000000000000000000000000000000000000000000000008",
"transactions": [
"0x0000000000000000000000000000000000000000000000000000000000000009",
"0x0000000000000000000000000000000000000000000000000000000000000010",
"0x0000000000000000000000000000000000000000000000000000000000000011"
]
}
}
flacky flacky double checky
*/
return 0, errors.New("not implemented")
func (d *downloader) getBlockHeader(ctx context.Context, blockNum uint64) (blockHeader, error) {
bn := big.NewInt(int64(blockNum))
block, err := d.ethClient.BlockByNumber(ctx, bn)
if err != nil {
return blockHeader{}, err
}
return blockHeader{
Num: block.NumberU64(),
Hash: block.Hash(),
}, nil
}
144 changes: 141 additions & 3 deletions localbridgesync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,154 @@ package localbridgesync
import (
"context"
"errors"
"fmt"
"time"
)

const (
checkReorgInterval = time.Second * 10
downloadBufferSize = 100
)

type driver struct {
p *processor
d *downloader
}

func newDriver() (*driver, error) {
return nil, errors.New("not implemented")
}

func (lbs *LocalBridgeSync) Sync(ctx *context.Context) {
// Download data
// Process data
func (d *driver) Sync(ctx context.Context) {
for {
lastProcessedBlock, err := d.p.getLastProcessedBlock(ctx)
if err != nil {
// TODO: handle error
return
}
cancellableCtx, cancel := context.WithCancel(ctx)
defer cancel()

// start downloading
downloadCh := make(chan block, downloadBufferSize)
go d.d.download(cancellableCtx, lastProcessedBlock, downloadCh)

// detect potential reorgs
reorgCh := make(chan uint64)
go d.detectReorg(cancellableCtx, reorgCh)

for {
select {
case b := <-downloadCh: // new block from downloader
err = d.storeBlockToReorgTracker(b.blockHeader)
if err != nil {
// TODO: handle error
return
}
err = d.p.storeBridgeEvents(b.Num, b.Events)
if err != nil {
// TODO: handle error
return
}
case lastValidBlock := <-reorgCh: // reorg detected
// stop downloader
cancel()
// wait until downloader closes channel
_, ok := <-downloadCh
for ok {
_, ok = <-downloadCh
}
// handle reorg
err = d.p.reorg(lastValidBlock)
if err != nil {
// TODO: handle error
return
}

// restart syncing
break
}
}
}
}

// IMO we could make a package "reorg detector" that could be reused by all the syncers
// each instance should use it's own storage / bucket

func (d *driver) detectReorg(ctx context.Context, reorgCh chan uint64) {
var (
expectedHeader blockHeader
err error
)
for {
expectedHeader, err = d.getGreatestBlockFromReorgTracker()
if err != nil {
// TODO: handle error
return
}
actualHeader, err := d.d.getBlockHeader(ctx, expectedHeader.Num)
if err != nil {
// TODO: handle error
return
}
if actualHeader.Hash != expectedHeader.Hash {
fmt.Printf("reorg detected")
break
}
time.Sleep(checkReorgInterval)
}
// Find last valid block
oldestTrackedBlock, err := d.getSmallestBlockFromReorgTracker()
if err != nil {
// TODO: handle error
return
}
lastValidBlock := oldestTrackedBlock.Num
for i := expectedHeader.Num - 1; i > lastValidBlock; i-- {
expectedHeader, err = d.getBlockFromReorgTracker(i)
if err != nil {
// TODO: handle error
return
}
actualHeader, err := d.d.getBlockHeader(ctx, expectedHeader.Num)
if err != nil {
// TODO: handle error
return
}
if actualHeader.Hash == expectedHeader.Hash {
fmt.Printf("valid block detected")
reorgCh <- actualHeader.Num
return
}
}
// this should never happen! But if it happens it should delete ALL the data from the processor
// or have logic to handle this "special case"
fmt.Printf("no valid block detected!!!!")
reorgCh <- oldestTrackedBlock.Num
return
}

func (d *driver) storeBlockToReorgTracker(b blockHeader) error {
fmt.Printf("adding block %d with hash %s to the reorg tracker storage\n", b.Num, b.Hash)
return errors.New("not implemented")
}

func (d *driver) removeBlockFromReorgTracker(blockNum uint64) error {
fmt.Printf("removing block %d from the reorg tracker storage\n", blockNum)
return errors.New("not implemented")
}

func (d *driver) getGreatestBlockFromReorgTracker() (blockHeader, error) {
fmt.Println("getting the block with the greatest block num from the reorg tracker storage")
return blockHeader{}, errors.New("not implemented")
}

func (d *driver) getBlockFromReorgTracker(blockNum uint64) (blockHeader, error) {
fmt.Printf("getting the block %d from the reorg tracker storage", blockNum)
return blockHeader{}, errors.New("not implemented")
}

func (d *driver) getSmallestBlockFromReorgTracker() (blockHeader, error) {
fmt.Println("getting the block with the smallest block num from the reorg tracker storage")
return blockHeader{}, errors.New("not implemented")
}
1 change: 1 addition & 0 deletions localbridgesync/localbridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ type LocalBridgeSync struct {
}

func New() (*LocalBridgeSync, error) {
// init driver, processor and downloader
return &LocalBridgeSync{}, errors.New("not implemented")
}
Loading

0 comments on commit dbb1122

Please sign in to comment.