Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic committed Mar 11, 2024
2 parents 39024eb + 3660715 commit 46dd3da
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[state]` avoid double-saving `FinalizeBlockResponse` for performance reasons
([\#2017](https://github.com/cometbft/cometbft/pull/2017))
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[blocksync]` make the max number of downloaded blocks dynamic.
Previously it was a const 600. Now it's `peersCount * maxPendingRequestsPerPeer (20)`
[\#2467](https://github.com/cometbft/cometbft/pull/2467)
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# CHANGELOG

## Unreleased

* [#5](https://github.com/osmosis-labs/cometbft/pull/5) Batch verification
* [#11](https://github.com/osmosis-labs/cometbft/pull/11) Skip verification of commit sigs
* [#13](https://github.com/osmosis-labs/cometbft/pull/13) Avoid double-saving ABCI responses
* [#17](https://github.com/osmosis-labs/cometbft/pull/17) Set the max number of (concurrently) downloaded blocks to {peersCount * 20}

## osmo-v23/v0.37.4-2

* [#3](https://github.com/osmosis-labs/cometbft/pull/3) Avoid double-calling types.BlockFromProto
* [#4](https://github.com/osmosis-labs/cometbft/pull/4) Do not validatorBlock twice

## osmo-v23/v0.37.4-1

## v0.37.4

*November 27, 2023*
Expand Down
39 changes: 14 additions & 25 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"sort"
"sync/atomic"
"time"

flow "github.com/cometbft/cometbft/libs/flowrate"
Expand Down Expand Up @@ -58,7 +57,7 @@ var peerTimeout = 7 * time.Second // not const so we can override with tests
Requests are continuously made for blocks of higher heights until
the limit is reached. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
are not at peer limits, we can probably switch to consensus reactor.
*/

// BlockPool keeps track of the block sync peers, block requests and block responses.
Expand All @@ -76,9 +75,6 @@ type BlockPool struct {
sortedPeers []*bpPeer // sorted by curRate, highest first
maxPeerHeight int64 // the biggest reported height

// atomic
numPending int32 // number of requests pending assignment or block response

requestsCh chan<- BlockRequest
errorsCh chan<- peerError
}
Expand All @@ -92,7 +88,6 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
requesters: make(map[int64]*bpRequester),
height: start,
startHeight: start,
numPending: 0,

requestsCh: requestsCh,
errorsCh: errorsCh,
Expand All @@ -109,7 +104,6 @@ func (pool *BlockPool) OnStart() error {
return nil
}

// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
Expand Down Expand Up @@ -182,25 +176,16 @@ func (pool *BlockPool) removeTimedoutPeers() {
pool.sortPeers()
}

// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}

// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
func (pool *BlockPool) IsCaughtUp() (isCaughtUp bool, height, maxPeerHeight int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
return false
return false, pool.height, pool.maxPeerHeight
}

// Some conditions to determine if we're caught up.
Expand All @@ -210,8 +195,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
isCaughtUp = receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp, pool.height, pool.maxPeerHeight
}

// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
Expand Down Expand Up @@ -311,13 +296,19 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
pool.sendError(err, peerID)
}

atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
}

// Height returns the pool's height.
func (pool *BlockPool) Height() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height
}

// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
Expand Down Expand Up @@ -437,9 +428,8 @@ func (pool *BlockPool) makeNextRequester(nextHeight int64) {
defer pool.mtx.Unlock()

request := newBPRequester(pool, nextHeight)

pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)
pool.mtx.Unlock()

if err := request.Start(); err != nil {
request.Logger.Error("Error starting request", "err", err)
Expand Down Expand Up @@ -659,7 +649,6 @@ func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) {

// Only remove the block if we got it from that peer.
if bpr.gotBlockFrom == peerID {
atomic.AddInt32(&bpr.pool.numPending, 1)
bpr.block = nil
bpr.gotBlockFrom = ""
removedBlock = true
Expand Down Expand Up @@ -745,7 +734,7 @@ OUTER_LOOP:
for {
bpr.pickPeerAndSendRequest()

poolHeight, _, _ := bpr.pool.GetStatus()
poolHeight := bpr.pool.Height()
if bpr.height-poolHeight < minBlocksForSingleRequest {
bpr.pickSecondPeerAndSendRequest()
}
Expand Down
33 changes: 15 additions & 18 deletions blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,13 @@ func TestBlockPoolBasic(t *testing.T) {
}
})

for _, peer := range peers {
pool.SetPeerRange(peer.id, peer.base, peer.height)
}

peers.start()
defer peers.stop()

// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()

// Start a goroutine to pull blocks
go func() {
for {
Expand All @@ -118,7 +115,7 @@ func TestBlockPoolBasic(t *testing.T) {
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
time.Sleep(10 * time.Millisecond)
}
}
}()
Expand All @@ -133,8 +130,10 @@ func TestBlockPoolBasic(t *testing.T) {
if request.Height == 300 {
return // Done!
}

peers[request.PeerID].inputChan <- inputData{t, pool, request}
case <-time.After(10 * time.Second):
t.Error("Timed out waiting for block requests")
return
}
}
}
Expand All @@ -148,27 +147,22 @@ func TestBlockPoolTimeout(t *testing.T) {
)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())

err := pool.Start()
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
if err := pool.Stop(); err != nil {
t.Error(err)
}
})

for _, peer := range peers {
t.Logf("Peer %v", peer.id)
pool.SetPeerRange(peer.id, peer.base, peer.height)
}

// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()

// Start a goroutine to pull blocks
go func() {
for {
Expand All @@ -179,7 +173,7 @@ func TestBlockPoolTimeout(t *testing.T) {
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
time.Sleep(10 * time.Millisecond)
}
}
}()
Expand All @@ -200,6 +194,9 @@ func TestBlockPoolTimeout(t *testing.T) {
}
case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %+v", request)
case <-time.After(10 * time.Second):
t.Error("Timed out waiting for block requests")
return
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,10 @@ FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Debug("Consensus ticker", "outbound", outbound, "inbound", inbound, "lastHeight", state.LastBlockHeight)

if isCaughtUp, height, _ := bcR.pool.IsCaughtUp(); isCaughtUp {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
Expand Down Expand Up @@ -432,9 +431,9 @@ FOR_LOOP:
blocksSynced++

if blocksSynced%100 == 0 {
_, height, maxPeerHeight := bcR.pool.IsCaughtUp()
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Block Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
bcR.Logger.Info("Block Sync Rate", "height", height, "max_peer_height", maxPeerHeight, "blocks/s", lastRate)
lastHundred = time.Now()
}

Expand Down
7 changes: 4 additions & 3 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestNoBlockResponse(t *testing.T) {
}

for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
if isCaughtUp, _, _ := reactorPairs[1].reactor.pool.IsCaughtUp(); isCaughtUp {
break
}

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
time.Sleep(1 * time.Second)
caughtUp := true
for _, r := range reactorPairs {
if !r.reactor.pool.IsCaughtUp() {
if isCaughtUp, _, _ := r.reactor.pool.IsCaughtUp(); !isCaughtUp {
caughtUp = false
}
}
Expand Down Expand Up @@ -285,7 +285,8 @@ func TestBadBlockStopsPeer(t *testing.T) {
}

for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
isCaughtUp, _, _ := lastReactorPair.reactor.pool.IsCaughtUp()
if isCaughtUp || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}

Expand Down
Loading

0 comments on commit 46dd3da

Please sign in to comment.