From b89e3f7d36913348a28c4f3f6c1d6ecdf55e2662 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Wed, 28 Feb 2024 19:07:40 -0700 Subject: [PATCH] attempt multi fetch block --- blocksync/pool.go | 305 ++++++++++++++++++++++++++++++------------- blocksync/reactor.go | 63 +++++---- 2 files changed, 251 insertions(+), 117 deletions(-) diff --git a/blocksync/pool.go b/blocksync/pool.go index 9a95fe56214..c8a773daec1 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" "math" - "sync/atomic" + "math/rand" "time" flow "github.com/cometbft/cometbft/libs/flowrate" @@ -28,11 +28,13 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 2 - maxTotalRequesters = 600 + requestIntervalMS = 2 + maxTotalRequesters = 50 + //maxTotalRequesters = 1200 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 - requestRetrySeconds = 30 + //maxPendingRequestsPerPeer = 50 + requestRetrySeconds = 30 // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we @@ -42,11 +44,11 @@ const ( // sending data across atlantic ~ 7.5 KB/s. minRecvRate = 7680 - // Maximum difference between current and new block's height. - maxDiffBetweenCurrentAndReceivedBlockHeight = 100 + // // Maximum difference between current and new block's height. + // maxDiffBetweenCurrentAndReceivedBlockHeight = 4 ) -var peerTimeout = 15 * time.Second // not const so we can override with tests +var peerTimeout = 10 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -68,13 +70,11 @@ type BlockPool struct { // block requests requesters map[int64]*bpRequester height int64 // the lowest key in requesters. + heightRecv map[int64]bool // peers peers map[p2p.ID]*bpPeer maxPeerHeight int64 // the biggest reported height - // atomic - numPending int32 // number of requests pending assignment or block response - requestsCh chan<- BlockRequest errorsCh chan<- peerError } @@ -87,7 +87,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p requesters: make(map[int64]*bpRequester), height: start, - numPending: 0, + heightRecv: make(map[int64]bool), requestsCh: requestsCh, errorsCh: errorsCh, @@ -108,23 +108,15 @@ func (pool *BlockPool) OnStart() error { func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { - break + return } - _, numPending, lenRequesters := pool.GetStatus() + _, lenRequesters := pool.GetStatus() switch { - case numPending >= maxPendingRequests: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() case lenRequesters >= maxTotalRequesters: - // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers pool.removeTimedoutPeers() default: - // request for more blocks. pool.makeNextRequester() } } @@ -154,13 +146,12 @@ func (pool *BlockPool) removeTimedoutPeers() { } } -// GetStatus returns pool's height, numPending requests and the number of -// requesters. -func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { +// GetStatus returns pool's height and the number of requesters. +func (pool *BlockPool) GetStatus() (height int64, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() - return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) + return pool.height, len(pool.requesters) } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -219,6 +210,7 @@ func (pool *BlockPool) PopRequest() { pool.Logger.Error("Error stopping requester", "err", err) } delete(pool.requesters, pool.height) + // This is where we increase the pool's height pool.height++ } else { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) @@ -228,17 +220,19 @@ func (pool *BlockPool) PopRequest() { // RedoRequest invalidates the block at pool.height, // Remove the peer and redo request from others. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RedoRequest(height int64) []p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { + peerIDs := request.getPeerID() + if peerIDs != nil { // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) + for _, peerID := range peerIDs { + pool.removePeer(peerID) + } } - return peerID + return peerIDs } // AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. @@ -261,14 +255,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int if diff < 0 { diff *= -1 } - if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) - } + // const maxDiff = 50 // maximum difference between current and received block height + // if diff > maxDiff { + // pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // } return } if requester.setBlock(block, peerID) { - atomic.AddInt32(&pool.numPending, -1) peer := pool.peers[peerID] if peer != nil { peer.decrPending(blockSize) @@ -316,11 +310,11 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { } func (pool *BlockPool) removePeer(peerID p2p.ID) { - for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { - requester.redo(peerID) - } - } + // for _, requester := range pool.requesters { + // if idExists(peerID, requester.getPeerID()) { + // requester.redo(peerID) + // } + // } peer, ok := pool.peers[peerID] if ok { @@ -349,13 +343,54 @@ func (pool *BlockPool) updateMaxPeerHeight() { pool.maxPeerHeight = max } -// Pick an available peer with the given height available. -// If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +// // Pick an available peer with the given height available. +// // If no peers are available, returns nil. +// func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +// pool.mtx.Lock() +// defer pool.mtx.Unlock() + +// for _, peer := range pool.peers { +// if peer.didTimeout { +// pool.removePeer(peer.id) +// continue +// } +// if peer.numPending >= maxPendingRequestsPerPeer { +// continue +// } +// if height < peer.base || height > peer.height { +// continue +// } +// peer.incrPending() +// return peer +// } +// return nil +// } + +func (pool *BlockPool) pickIncrAvailablePeers(height int64) []*bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() + // Convert the map to a slice for shuffling + peerSlice := make([]*bpPeer, 0, len(pool.peers)) for _, peer := range pool.peers { + peerSlice = append(peerSlice, peer) + } + + // Shuffle the slice + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(peerSlice), func(i, j int) { + peerSlice[i], peerSlice[j] = peerSlice[j], peerSlice[i] + }) + + var peers []*bpPeer + peerLimit := 3 + if height > pool.height+10 { + peerLimit = 1 + } + for _, peer := range peerSlice { + if len(peers) >= peerLimit { + break + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -367,9 +402,15 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { continue } peer.incrPending() - return peer + peers = append(peers, peer) } - return nil + + // var peerIDs []p2p.ID + // for _, peer := range peers { + // peerIDs = append(peerIDs, peer.id) + // } + //fmt.Println("len of picking peers: ", len(peers), "for height ", height, "peers", peerIDs) + return peers } func (pool *BlockPool) makeNextRequester() { @@ -380,11 +421,11 @@ func (pool *BlockPool) makeNextRequester() { if nextHeight > pool.maxPeerHeight { return } + //fmt.Println("nextHeight", nextHeight) request := newBPRequester(pool, nextHeight) pool.requesters[nextHeight] = request - atomic.AddInt32(&pool.numPending, 1) err := request.Start() if err != nil { @@ -396,11 +437,11 @@ func (pool *BlockPool) requestersLen() int64 { return int64(len(pool.requesters)) } -func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { +func (pool *BlockPool) sendRequest(height int64, peerIDs []p2p.ID) { if !pool.IsRunning() { return } - pool.requestsCh <- BlockRequest{height, peerID} + pool.requestsCh <- BlockRequest{height, peerIDs} } func (pool *BlockPool) sendError(err error, peerID p2p.ID) { @@ -410,25 +451,25 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) { pool.errorsCh <- peerError{err, peerID} } -// for debugging purposes -// -//nolint:unused -func (pool *BlockPool) debug() string { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - str := "" - nextHeight := pool.height + pool.requestersLen() - for h := pool.height; h < nextHeight; h++ { - if pool.requesters[h] == nil { - str += fmt.Sprintf("H(%v):X ", h) - } else { - str += fmt.Sprintf("H(%v):", h) - str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil) - } - } - return str -} +// // for debugging purposes +// // +// //nolint:unused +// func (pool *BlockPool) debug() string { +// pool.mtx.Lock() +// defer pool.mtx.Unlock() + +// str := "" +// nextHeight := pool.height + pool.requestersLen() +// for h := pool.height; h < nextHeight; h++ { +// if pool.requesters[h] == nil { +// str += fmt.Sprintf("H(%v):X ", h) +// } else { +// str += fmt.Sprintf("H(%v):", h) +// str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil) +// } +// } +// return str +// } //------------------------------------- @@ -513,9 +554,9 @@ type bpRequester struct { gotBlockCh chan struct{} redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block + mtx cmtsync.Mutex + peerIDs []p2p.ID + block *types.Block } func newBPRequester(pool *BlockPool, height int64) *bpRequester { @@ -525,8 +566,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester { gotBlockCh: make(chan struct{}, 1), redoCh: make(chan p2p.ID, 1), - peerID: "", - block: nil, + peerIDs: nil, + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -540,7 +581,7 @@ func (bpr *bpRequester) OnStart() error { // Returns true if the peer matches and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.block != nil || !idExists(peerID, bpr.peerIDs) { bpr.mtx.Unlock() return false } @@ -560,10 +601,10 @@ func (bpr *bpRequester) getBlock() *types.Block { return bpr.block } -func (bpr *bpRequester) getPeerID() p2p.ID { +func (bpr *bpRequester) getPeerID() []p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + return bpr.peerIDs } // This is called from the requestRoutine, upon redo(). @@ -571,12 +612,9 @@ func (bpr *bpRequester) reset() { bpr.mtx.Lock() defer bpr.mtx.Unlock() - if bpr.block != nil { - atomic.AddInt32(&bpr.pool.numPending, 1) - } - - bpr.peerID = "" + bpr.peerIDs = nil bpr.block = nil + //bpr.pool.heightRecv[bpr.height] = false } // Tells bpRequester to pick another peer and try again. @@ -589,20 +627,78 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } -// Responsible for making more requests as necessary -// Returns only when a block is found (e.g. AddBlock() is called) +// // Responsible for making more requests as necessary +// // Returns only when a block is found (e.g. AddBlock() is called) +// func (bpr *bpRequester) requestRoutine() { +// OUTER_LOOP: +// for { +// // Pick a peer to send request to. +// var peer *bpPeer +// PICK_PEER_LOOP: +// for { +// if !bpr.IsRunning() || !bpr.pool.IsRunning() { +// return +// } +// peer = bpr.pool.pickIncrAvailablePeer(bpr.height) +// if peer == nil { +// bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) +// time.Sleep(requestIntervalMS * time.Millisecond) +// continue PICK_PEER_LOOP +// } +// break PICK_PEER_LOOP +// } +// bpr.mtx.Lock() +// bpr.peerID = peer.id +// bpr.mtx.Unlock() + +// to := time.NewTimer(requestRetrySeconds * time.Second) +// // Send request and wait. +// //fmt.Println("sending request for ", bpr.height) +// fmt.Println("sending request for height ", bpr.height, "ID ", peer.id) +// bpr.pool.sendRequest(bpr.height, peer.id) +// WAIT_LOOP: +// for { +// select { +// case <-bpr.pool.Quit(): +// if err := bpr.Stop(); err != nil { +// bpr.Logger.Error("Error stopped requester", "err", err) +// } +// return +// case <-bpr.Quit(): +// return +// case <-to.C: +// bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) +// // Simulate a redo +// bpr.reset() +// continue OUTER_LOOP +// case peerID := <-bpr.redoCh: +// if peerID == bpr.peerID { +// bpr.reset() +// continue OUTER_LOOP +// } else { +// continue WAIT_LOOP +// } +// case <-bpr.gotBlockCh: +// // We got a block! +// // Continue the for-loop and wait til Quit. +// continue WAIT_LOOP +// } +// } +// } +// } + func (bpr *bpRequester) requestRoutine() { OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer + // Pick peers to send request to. + var peers []*bpPeer PICK_PEER_LOOP: for { if !bpr.IsRunning() || !bpr.pool.IsRunning() { return } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { + peers = bpr.pool.pickIncrAvailablePeers(bpr.height) // function to get all available peers + if len(peers) == 0 { bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_PEER_LOOP @@ -610,12 +706,19 @@ OUTER_LOOP: break PICK_PEER_LOOP } bpr.mtx.Lock() - bpr.peerID = peer.id + bpr.peerIDs = nil + for _, peer := range peers { + bpr.peerIDs = append(bpr.peerIDs, peer.id) + // Send request and wait. + // if i == 0 { + // fmt.Println("sending request for height ", bpr.height) + // } + //fmt.Println("sending request for height ", bpr.height, "ID ", bpr.peerID) + } + bpr.pool.sendRequest(bpr.height, bpr.peerIDs) bpr.mtx.Unlock() to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) WAIT_LOOP: for { select { @@ -627,12 +730,20 @@ OUTER_LOOP: case <-bpr.Quit(): return case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) + fmt.Println("GOT TIMEOUT REQUEST ", bpr.height) + if bpr.pool.heightRecv[bpr.height] { + continue WAIT_LOOP + } + bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerIDs) // Simulate a redo bpr.reset() continue OUTER_LOOP case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { + fmt.Println("GOT REDO REQUEST ", bpr.height) + if bpr.pool.heightRecv[bpr.height] { + continue WAIT_LOOP + } + if idExists(peerID, bpr.peerIDs) { bpr.reset() continue OUTER_LOOP } else { @@ -641,15 +752,25 @@ OUTER_LOOP: case <-bpr.gotBlockCh: // We got a block! // Continue the for-loop and wait til Quit. + bpr.pool.heightRecv[bpr.height] = true continue WAIT_LOOP } } } } +func idExists(id p2p.ID, ids []p2p.ID) bool { + for _, existingID := range ids { + if existingID == id { + return true + } + } + return false +} + // BlockRequest stores a block request identified by the block Height and the PeerID responsible for // delivering the block type BlockRequest struct { - Height int64 - PeerID p2p.ID + Height int64 + PeerIDs []p2p.ID } diff --git a/blocksync/reactor.go b/blocksync/reactor.go index d6c01c81e44..f24688af4c5 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -234,7 +234,10 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.Switch.StopPeerForError(e.Src, err) return } - bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size()) + if bcR.pool.height <= bi.Height { + fmt.Println("adding block height", bi.Height, "of size", msg.Block.Size()) + bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size()) + } case *bcproto.StatusRequest: // Send peer our state. e.Src.TrySendEnvelope(p2p.Envelope{ @@ -285,16 +288,20 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { case <-bcR.pool.Quit(): return case request := <-bcR.requestsCh: - peer := bcR.Switch.Peers().Get(request.PeerID) - if peer == nil { - continue - } - queued := peer.TrySendEnvelope(p2p.Envelope{ - ChannelID: BlocksyncChannel, - Message: &bcproto.BlockRequest{Height: request.Height}, - }) - if !queued { - bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) + fmt.Println("len of peers ", len(request.PeerIDs), "IDs", request.PeerIDs) + for _, peerID := range request.PeerIDs { + peer := bcR.Switch.Peers().Get(peerID) + if peer == nil { + continue + } + fmt.Println("try send env ", request.Height) + queued := peer.TrySendEnvelope(p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: &bcproto.BlockRequest{Height: request.Height}, + }) + if !queued { + bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) + } } case err := <-bcR.errorsCh: peer := bcR.Switch.Peers().Get(err.peerID) @@ -314,9 +321,9 @@ FOR_LOOP: for { select { case <-switchToConsensusTicker.C: - height, numPending, lenRequesters := bcR.pool.GetStatus() + height, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, + bcR.Logger.Debug("Consensus ticker", "total", lenRequesters, "outbound", outbound, "inbound", inbound) if bcR.pool.IsCaughtUp() { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) @@ -397,20 +404,26 @@ FOR_LOOP: } bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err)) + peerIDs := bcR.pool.RedoRequest(first.Height) + for _, p := range peerIDs { + peer := bcR.Switch.Peers().Get(p) + if peer != nil { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err)) + } } - peerID2 := bcR.pool.RedoRequest(second.Height) - peer2 := bcR.Switch.Peers().Get(peerID2) - if peer2 != nil && peer2 != peer { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer2, fmt.Errorf("Reactor validation error: %v", err)) + + peerIDs2 := bcR.pool.RedoRequest(second.Height) + for _, p := range peerIDs2 { + peer2 := bcR.Switch.Peers().Get(p) + if peer2 != nil && !idExists(p, peerIDs) { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer2, fmt.Errorf("Reactor validation error: %v", err)) + } } + continue FOR_LOOP }