Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Jan 17, 2024
1 parent f6c97f7 commit ff91487
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 55 deletions.
9 changes: 4 additions & 5 deletions p2p/starknet/block_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package starknet

import (
"crypto/rand"
"fmt"
"slices"

"github.com/NethermindEth/juno/adapters/core2p2p"
Expand Down Expand Up @@ -75,7 +74,6 @@ func (b *blockBodyIterator) next() (msg proto.Message, valid bool) {
b.step = sendClasses
case sendClasses:
msg, valid = b.classes()
fmt.Println("Returned from classes()")
b.step = sendProof
case sendProof:
msg, valid = b.proof()
Expand All @@ -93,7 +91,6 @@ func (b *blockBodyIterator) next() (msg proto.Message, valid bool) {
}

func (b *blockBodyIterator) classes() (proto.Message, bool) {
fmt.Println("BlockBodyIterator: classes", b.header.Number)
var classes []*spec.Class

stateDiff := b.stateUpdate.StateDiff
Expand All @@ -114,15 +111,17 @@ func (b *blockBodyIterator) classes() (proto.Message, bool) {
classes = append(classes, core2p2p.AdaptClass(cls.Class))
}

return &spec.BlockBodiesResponse{
res := &spec.BlockBodiesResponse{
Id: b.blockID(),
BodyMessage: &spec.BlockBodiesResponse_Classes{
Classes: &spec.Classes{
Domain: 0,
Classes: classes,
},
},
}, true
}

return res, true
}

type contractDiff struct {
Expand Down
18 changes: 12 additions & 6 deletions p2p/starknet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package starknet

import (
"context"
"errors"
"fmt"
"io"

"github.com/NethermindEth/juno/p2p/starknet/spec"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -41,7 +43,10 @@ func sendAndCloseWrite(stream network.Stream, req proto.Message) error {
}

func receiveInto(stream network.Stream, res proto.Message) error {
return protodelim.UnmarshalFrom(&byteReader{stream}, res)
unmarshaller := protodelim.UnmarshalOptions{
MaxSize: 10 << 20, // 10 MB
}
return unmarshaller.UnmarshalFrom(&byteReader{stream}, res)
}

func requestAndReceiveStream[ReqT proto.Message, ResT proto.Message](ctx context.Context,
Expand All @@ -62,12 +67,13 @@ func requestAndReceiveStream[ReqT proto.Message, ResT proto.Message](ctx context
var zero ResT
res := zero.ProtoReflect().New().Interface()
if err := receiveInto(stream, res); err != nil {
// todo: check for a specific error otherwise log the error if it doesn't match
closeErr := stream.Close() // todo: dont ignore close errors
if !errors.Is(err, io.EOF) {
fmt.Println("Error while reading from stream", err)
}

closeErr := stream.Close()
if closeErr != nil {
fmt.Printf("-----------Close stream error-------------: %v\n", closeErr)
} else {
fmt.Printf("stream %v is closed for protocol %v\n", id, stream.Protocol())
fmt.Println("Error while closing stream", closeErr)
}
return zero, false
}
Expand Down
12 changes: 4 additions & 8 deletions p2p/starknet/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ func streamHandler[ReqT proto.Message](stream network.Stream,
}

for msg, valid := response(); valid; msg, valid = response() {
if stream.Conn().IsClosed() {
fmt.Println("Connection is closed for ", stream.ID(), stream.Protocol())
break
}

if _, err := protodelim.MarshalTo(stream, msg); err != nil { // todo: figure out if we need buffered io here
// log.Debugw("Error writing response", "peer", stream.ID(), "protocol", stream.Protocol(), "err", err)
fmt.Println("Error writing response", "peer", stream.ID(), "protocol", stream.Protocol(), "err", err)
Expand Down Expand Up @@ -145,7 +140,7 @@ func (h *Handler) blockHeaders(it *iterator, fin Stream[proto.Message]) Stream[p
return fin()
}
it.Next()
// fmt.Printf("Created iterator for header at blockNumber %d\n", header.Number)
fmt.Printf("Created Block Header Iterator for blockNumber %d\n", header.Number)

commitments, err := h.bcReader.BlockCommitmentsByNumber(header.Number)
if err != nil {
Expand Down Expand Up @@ -201,10 +196,11 @@ func (h *Handler) onBlockBodiesRequest(req *spec.BlockBodiesRequest) (Stream[pro
}
it.Next()

// fmt.Printf("Creating blockBodyIterator for blockNumber %d\n", header.Number)
fmt.Printf("Creating Block Body Iterator for blockNumber %d\n", header.Number)
bodyIterator, err = newBlockBodyIterator(h.bcReader, header, h.log)
if err != nil {
h.log.Errorw("Failed to create block body iterator", "err", err)
// h.log.Errorw("Failed to create block body iterator", "err", err)
fmt.Println("Failed to create block body iterator", "err", err)
return fin()
}
// no need to call hasNext since it's first iteration over a block
Expand Down
2 changes: 1 addition & 1 deletion p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func (s *syncService) createIterator(r BlockRange) *spec.Iteration {
return &spec.Iteration{
Start: &spec.Iteration_BlockNumber{r.Start},
Direction: spec.Iteration_Forward,
Limit: limit,
Limit: limit + 1,
Step: 1,
}
}
36 changes: 12 additions & 24 deletions p2p/sync_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"time"

"github.com/davecgh/go-spew/spew"

"github.com/NethermindEth/juno/adapters/p2p2core"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
Expand Down Expand Up @@ -38,7 +36,6 @@ func (s *syncService) startPipeline(ctx context.Context) {
s.logError("Failed to get boot node height", err)
return
}
s.log.Infow("Boot node height", "height", bootNodeHeight)

var nextHeight uint64
if curHeight, err := s.blockchain.Height(); err == nil { //nolint:govet
Expand All @@ -47,12 +44,14 @@ func (s *syncService) startPipeline(ctx context.Context) {
s.log.Errorw("Failed to get current height", "err", err)
}

fmt.Println("Next height", nextHeight)
if bootNodeHeight-nextHeight == 0 {
time.Sleep(time.Second)
if bootNodeHeight-(nextHeight-1) == 0 {
s.log.Infow("Bootnode height is the same as local height, retrying in 30s")
time.Sleep(30 * time.Second)
continue
}

s.log.Infow("Start Pipeline", "Bootnode height", bootNodeHeight, "Current height", nextHeight-1)

commonIt := s.createIterator(BlockRange{nextHeight, bootNodeHeight})
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, commonIt)
if err != nil {
Expand Down Expand Up @@ -104,7 +103,6 @@ func (s *syncService) startPipeline(ctx context.Context) {
s.log.Errorw("Failed to process block", "err", b.err)
return
}
fmt.Println("About to store a block")
err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
Expand All @@ -116,12 +114,6 @@ func (s *syncService) startPipeline(ctx context.Context) {
}
}

// Todo: Temp func, remove after debugging
func closeAndLog[T any](label string, ch chan T) {
fmt.Printf("Closing %q\n", label)
close(ch)
}

func (s *syncService) logError(msg string, err error) {
if !errors.Is(err, context.Canceled) {
s.log.Errorw(msg, "err", err)
Expand All @@ -134,7 +126,7 @@ func (s *syncService) processSpecBlockParts(ctx context.Context, startingBlockNu
orderedBlockBodiesCh := make(chan (<-chan blockBody))

go func() {
defer closeAndLog("orderedBlockBodiesCh", orderedBlockBodiesCh)
defer close(orderedBlockBodiesCh)

specBlockHeadersAndSigsM := make(map[uint64]specBlockHeaderAndSigs)
specBlockBodiesM := make(map[uint64]specBlockBody)
Expand Down Expand Up @@ -226,7 +218,7 @@ func (s *syncService) adaptAndSanityCheckBlock(ctx context.Context, header *spec
) <-chan blockBody {
bodyCh := make(chan blockBody)
go func() {
defer closeAndLog("bodyCh", bodyCh)
defer close(bodyCh)
select {
case <-ctx.Done():
bodyCh <- blockBody{err: ctx.Err()}
Expand Down Expand Up @@ -325,7 +317,7 @@ func (s *syncService) genHeadersAndSigs(ctx context.Context, it *spec.Iteration)

headersAndSigCh := make(chan specBlockHeaderAndSigs)
go func() {
defer closeAndLog("headersAndSigCh", headersAndSigCh)
defer close(headersAndSigCh)

iteratorLoop:
for res, valid := headersIt(); valid; res, valid = headersIt() {
Expand Down Expand Up @@ -373,24 +365,20 @@ func (s *syncService) genBlockBodies(ctx context.Context, it *spec.Iteration) (<

specBodiesCh := make(chan specBlockBody)
go func() {
defer closeAndLog("specBodiesCh", specBodiesCh)
defer close(specBodiesCh)
curBlockBody := new(specBlockBody)
// Assumes that all parts of the same block will arrive before the next block parts
// Todo: the above assumption may not be true. A peer may decide to send different parts of the block in different order
// If the above assumption is not true we should return separate channels for each of the parts. Also, see todo above specBlockBody
// on line 317 in p2p/sync.go
for res, valid := blockIt(); valid; res, valid = blockIt() {
fmt.Println("Reading from block bodies iterator")
spew.Dump(res.BodyMessage)
switch res.BodyMessage.(type) {
case *spec.BlockBodiesResponse_Classes:
if curBlockBody.id == nil {
fmt.Println("Got block body part for block ID", res.Id.String())
curBlockBody.id = res.GetId()
}
curBlockBody.classes = res.GetClasses()
case *spec.BlockBodiesResponse_Diff:
fmt.Println("Got StateDiff", res.Id.String())
if curBlockBody.id == nil {
curBlockBody.id = res.GetId()
}
Expand Down Expand Up @@ -433,7 +421,7 @@ func (s *syncService) genReceipts(ctx context.Context, it *spec.Iteration) (<-ch

receiptsCh := make(chan specReceipts)
go func() {
defer closeAndLog("receiptsCh", receiptsCh)
defer close(receiptsCh)

for res, valid := receiptsIt(); valid; res, valid = receiptsIt() {
switch res.Responses.(type) {
Expand Down Expand Up @@ -468,7 +456,7 @@ func (s *syncService) genEvents(ctx context.Context, it *spec.Iteration) (<-chan

eventsCh := make(chan specEvents)
go func() {
defer closeAndLog("eventsCh", eventsCh)
defer close(eventsCh)
for res, valid := eventsIt(); valid; res, valid = eventsIt() {
switch res.Responses.(type) {
case *spec.EventsResponse_Events:
Expand Down Expand Up @@ -501,7 +489,7 @@ func (s *syncService) genTransactions(ctx context.Context, it *spec.Iteration) (

txsCh := make(chan specTransactions)
go func() {
defer closeAndLog("txsCh", txsCh)
defer close(txsCh)
for res, valid := txsIt(); valid; res, valid = txsIt() {
switch res.Responses.(type) {
case *spec.TransactionsResponse_Transactions:
Expand Down
15 changes: 4 additions & 11 deletions utils/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"context"
"fmt"
"sync"
)

Expand Down Expand Up @@ -50,13 +49,13 @@ func Stage[From any, To any](ctx context.Context, in <-chan From, f func(From) T
out := make(chan To)

if in == nil {
closeAndLog("Stage in is nil", out)
close(out)
return out
}

// todo handle panic?
go func() {
defer closeAndLog("Stage defer", out)
defer close(out)
for v := range in {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -92,8 +91,7 @@ func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {

go func() {
wg.Wait()
closeAndLog("FanIn", out)
// close(out)
close(out)
}()

return out
Expand Down Expand Up @@ -121,7 +119,7 @@ func End[T any](in <-chan T, f func(T)) <-chan struct{} {
func Bridge[T any](ctx context.Context, chanCh <-chan <-chan T) <-chan T {
out := make(chan T)
go func() {
defer closeAndLog("bridge", out)
defer close(out)
for {
var ch <-chan T
select {
Expand Down Expand Up @@ -152,8 +150,3 @@ func Bridge[T any](ctx context.Context, chanCh <-chan <-chan T) <-chan T {
}()
return out
}

func closeAndLog[T any](label string, ch chan T) {
fmt.Printf("Closing %q\n", label)
close(ch)
}

0 comments on commit ff91487

Please sign in to comment.