Skip to content

Commit

Permalink
Cont sync mvp
Browse files Browse the repository at this point in the history
  • Loading branch information
kirugan committed Jan 5, 2024
1 parent 979baa2 commit 17bfd68
Showing 1 changed file with 68 additions and 62 deletions.
130 changes: 68 additions & 62 deletions p2p/sync_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,77 +22,83 @@ func (s *syncService) startPipeline(ctx context.Context) {

s.client = starknet.NewClient(s.randomPeerStream, s.network, s.log)

bootNodeHeight, err := s.bootNodeHeight(ctx)
if err != nil {
s.log.Errorw("Failed to get boot node height", "err", err)
return
}
s.log.Infow("Boot node height", "height", bootNodeHeight)
var bootNodeHeight uint64
for i := 0; ; i++ {
s.log.Infow("Continous iteration", "i", i)

var nextHeight uint64
if curHeight, err := s.blockchain.Height(); err == nil { //nolint:govet
nextHeight = curHeight + 1
} else if !errors.Is(db.ErrKeyNotFound, err) {
s.log.Errorw("Failed to get current height", "err", err)
}

commonIt := s.createIterator(BlockRange{nextHeight, bootNodeHeight})
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get block headers parts", "err", err)
return
}
var err error
bootNodeHeight, err = s.bootNodeHeight(ctx)
if err != nil {
s.log.Errorw("Failed to get boot node height", "err", err)
return
}
s.log.Infow("Boot node height", "height", bootNodeHeight)

blockBodiesCh, err := s.genBlockBodies(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get block bodies", "err", err)
return
}
var nextHeight uint64
if curHeight, err := s.blockchain.Height(); err == nil { //nolint:govet
nextHeight = curHeight + 1
} else if !errors.Is(db.ErrKeyNotFound, err) {
s.log.Errorw("Failed to get current height", "err", err)
}

txsCh, err := s.genTransactions(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get transactions", "err", err)
return
}
commonIt := s.createIterator(BlockRange{nextHeight, bootNodeHeight})
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get block headers parts", "err", err)
return
}

receiptsCh, err := s.genReceipts(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get receipts", "err", err)
return
}
blockBodiesCh, err := s.genBlockBodies(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get block bodies", "err", err)
return
}

eventsCh, err := s.genEvents(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get events", "err", err)
return
}
txsCh, err := s.genTransactions(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get transactions", "err", err)
return
}

// A channel of a specific type cannot be converted to a channel of another type. Therefore, we have to consume/read from the channel
// and change the input to the desired type. The following is not allowed:
// var ch1 chan any = make(chan any)
// var ch2 chan someOtherType = make(chan someOtherType)
// ch2 = (chan any)(ch2) <----- This line will give compilation error.

for b := range pipeline.Bridge(ctx,
s.processSpecBlockParts(ctx, nextHeight,
pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, func(i specBlockHeaderAndSigs) specBlockParts { return i }),
pipeline.Stage(ctx, blockBodiesCh, func(i specBlockBody) specBlockParts { return i }),
pipeline.Stage(ctx, txsCh, func(i specTransactions) specBlockParts { return i }),
pipeline.Stage(ctx, receiptsCh, func(i specReceipts) specBlockParts { return i }),
pipeline.Stage(ctx, eventsCh, func(i specEvents) specBlockParts { return i }),
))) {
if b.err != nil {
// cannot process any more blocks
s.log.Errorw("Failed to process block", "err", b.err)
receiptsCh, err := s.genReceipts(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to get receipts", "err", err)
return
}
err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)

eventsCh, err := s.genEvents(ctx, commonIt)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
} else {
s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root",
b.block.GlobalStateRoot.ShortString())
s.log.Errorw("Failed to get events", "err", err)
return
}

// A channel of a specific type cannot be converted to a channel of another type. Therefore, we have to consume/read from the channel
// and change the input to the desired type. The following is not allowed:
// var ch1 chan any = make(chan any)
// var ch2 chan someOtherType = make(chan someOtherType)
// ch2 = (chan any)(ch2) <----- This line will give compilation error.

for b := range pipeline.Bridge(ctx,
s.processSpecBlockParts(ctx, nextHeight,
pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, func(i specBlockHeaderAndSigs) specBlockParts { return i }),
pipeline.Stage(ctx, blockBodiesCh, func(i specBlockBody) specBlockParts { return i }),
pipeline.Stage(ctx, txsCh, func(i specTransactions) specBlockParts { return i }),
pipeline.Stage(ctx, receiptsCh, func(i specReceipts) specBlockParts { return i }),
pipeline.Stage(ctx, eventsCh, func(i specEvents) specBlockParts { return i }),
))) {
if b.err != nil {
// cannot process any more blocks
s.log.Errorw("Failed to process block", "err", b.err)
return
}
err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
} else {
s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root",
b.block.GlobalStateRoot.ShortString())
}
}
}
}
Expand Down

0 comments on commit 17bfd68

Please sign in to comment.