Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip Timeout #310

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
node.PeerErrorChan,
node.Host.ID(),
node,
node.Applicator)
node.Applicator,
config.GossipOptions)

node.ConnectionManager = p2p.NewConnectionManager(
node.Host,
Expand Down
2 changes: 2 additions & 0 deletions internal/options/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Config struct {
GossipToggleOptions GossipToggleOptions
ApplicatorOptions ApplicatorOptions
ConnectionManagerOptions ConnectionManagerOptions
GossipOptions GossipOptions
}

// NewConfig creates a new Config
Expand All @@ -19,6 +20,7 @@ func NewConfig() *Config {
GossipToggleOptions: *NewGossipToggleOptions(),
ApplicatorOptions: *NewApplicatorOptions(),
ConnectionManagerOptions: *NewConnectionManagerOptions(),
GossipOptions: *NewGossipOptions(),
}
return &config
}
22 changes: 22 additions & 0 deletions internal/options/gossip_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package options

import "time"

const (
blockTimeoutDefault = 2 * time.Second
transactionTimeoutDefault = 1 * time.Second
)

// GossipOptions are options for Gossip
type GossipOptions struct {
BlockTimeout time.Duration
TransactionTimeout time.Duration
}

// New GossipOptions returns default initialized GossipOptions
func NewGossipOptions() *GossipOptions {
return &GossipOptions{
BlockTimeout: blockTimeoutDefault,
TransactionTimeout: transactionTimeoutDefault,
}
}
6 changes: 3 additions & 3 deletions internal/options/peer_connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
blockRequestTimeoutDefault = time.Second * time.Duration(blockRequestBatchSizeDefault*0.3) // Roughly calculated considering 30Mbps minimum upload speed and 1MB blocks
handshakeRetryTimeDefault = time.Second * 6
syncedBlockDeltaDefault = 5
syncedPingTimeDefault = time.Second * 10
syncedSleepTimeDefault = time.Second * 10
)

// PeerConnectionOptions are options for PeerConnection
Expand All @@ -33,7 +33,7 @@ type PeerConnectionOptions struct {
BlockRequestTimeout time.Duration
HandshakeRetryTime time.Duration
SyncedBlockDelta uint64
SyncedPingTime time.Duration
SyncedSleepTime time.Duration
}

// NewPeerConnectionOptions returns default initialized PeerConnectionOptions
Expand All @@ -47,6 +47,6 @@ func NewPeerConnectionOptions() *PeerConnectionOptions {
BlockRequestTimeout: blockRequestTimeoutDefault,
HandshakeRetryTime: handshakeRetryTimeDefault,
SyncedBlockDelta: syncedBlockDeltaDefault,
SyncedPingTime: syncedPingTimeDefault,
SyncedSleepTime: syncedSleepTimeDefault,
}
}
39 changes: 24 additions & 15 deletions internal/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

log "github.com/koinos/koinos-log-golang/v2"
"github.com/koinos/koinos-p2p/internal/options"
"github.com/koinos/koinos-p2p/internal/p2perrors"
"github.com/koinos/koinos-p2p/internal/rpc"
"github.com/koinos/koinos-proto-golang/v2/koinos/canonical"
Expand Down Expand Up @@ -159,7 +160,8 @@ type KoinosGossip struct {
myPeerID peer.ID
libProvider LastIrreversibleBlockProvider
applicator *Applicator
reportCancel *context.CancelFunc
opts options.GossipOptions
gossipCancel *context.CancelFunc
recentBlocks uint32
recentTrxs uint32
}
Expand All @@ -172,7 +174,8 @@ func NewKoinosGossip(
peerErrorChan chan<- PeerError,
id peer.ID,
libProvider LastIrreversibleBlockProvider,
applicator *Applicator) *KoinosGossip {
applicator *Applicator,
opts options.GossipOptions) *KoinosGossip {

block := NewGossipManager(ps, peerErrorChan, BlockTopicName)
transaction := NewGossipManager(ps, peerErrorChan, TransactionTopicName)
Expand All @@ -185,6 +188,7 @@ func NewKoinosGossip(
myPeerID: id,
libProvider: libProvider,
applicator: applicator,
opts: opts,
}

return &kg
Expand All @@ -208,10 +212,10 @@ func (kg *KoinosGossip) EnableGossip(ctx context.Context, enable bool) {
// StartGossip enables gossip of blocks and transactions
func (kg *KoinosGossip) StartGossip(ctx context.Context) {
log.Info("Starting gossip mode")
kg.startBlockGossip(ctx)
kg.startTransactionGossip(ctx)
reportCtx, reportCancel := context.WithCancel(ctx)
kg.reportCancel = &reportCancel
gossipCtx, gossipCancel := context.WithCancel(ctx)
kg.gossipCancel = &gossipCancel
kg.startBlockGossip(gossipCtx)
kg.startTransactionGossip(gossipCtx)

go func() {
for {
Expand All @@ -223,7 +227,7 @@ func (kg *KoinosGossip) StartGossip(ctx context.Context) {
if numBlocks > 0 || numTrxs > 0 {
log.Infof("Recently gossiped %v block(s) and %v transaction(s)", numBlocks, numTrxs)
}
case <-reportCtx.Done():
case <-gossipCtx.Done():
return
}
}
Expand All @@ -235,9 +239,9 @@ func (kg *KoinosGossip) StopGossip() {
log.Info("Stopping gossip mode")
kg.block.Stop()
kg.transaction.Stop()
if kg.reportCancel != nil {
(*kg.reportCancel)()
kg.reportCancel = nil
if kg.gossipCancel != nil {
(*kg.gossipCancel)()
kg.gossipCancel = nil
}
}

Expand All @@ -254,7 +258,7 @@ func (kg *KoinosGossip) PublishTransaction(ctx context.Context, transaction *pro

log.Debugf("Publishing transaction - %s", util.TransactionString(transaction))
atomic.AddUint32(&kg.recentTrxs, 1)
kg.transaction.PublishMessage(context.Background(), binary)
kg.transaction.PublishMessage(ctx, binary)
}

return nil
Expand All @@ -273,7 +277,7 @@ func (kg *KoinosGossip) PublishBlock(ctx context.Context, block *protocol.Block)

log.Debugf("Publishing block - %s", util.BlockString(block))
atomic.AddUint32(&kg.recentBlocks, 1)
kg.block.PublishMessage(context.Background(), binary)
kg.block.PublishMessage(ctx, binary)
}

return nil
Expand Down Expand Up @@ -361,8 +365,10 @@ func (kg *KoinosGossip) applyBlock(ctx context.Context, pid peer.ID, msg *pubsub

log.Debugf("Pushing gossip block - %s from peer %v", util.BlockString(block), msg.ReceivedFrom)

// TODO: Fix nil argument
if err := kg.applicator.ApplyBlock(ctx, block); err != nil {
applyBlockContext, cancelApplyBlock := context.WithTimeout(ctx, kg.opts.BlockTimeout)
defer cancelApplyBlock()

if err := kg.applicator.ApplyBlock(applyBlockContext, block); err != nil {
return fmt.Errorf("%w - %s, %v", p2perrors.ErrBlockApplication, util.BlockString(block), err.Error())
}

Expand Down Expand Up @@ -426,7 +432,10 @@ func (kg *KoinosGossip) applyTransaction(ctx context.Context, pid peer.ID, msg *
return fmt.Errorf("%w, gossiped transaction missing id", p2perrors.ErrDeserialization)
}

if err := kg.applicator.ApplyTransaction(ctx, transaction); err != nil {
applyTransactionContext, cancelApplyTransaction := context.WithTimeout(ctx, kg.opts.TransactionTimeout)
defer cancelApplyTransaction()

if err := kg.applicator.ApplyTransaction(applyTransactionContext, transaction); err != nil {
if errors.Is(err, p2perrors.ErrInvalidNonce) {
return fmt.Errorf("%w - %s, %v", p2perrors.ErrInvalidNonce, util.TransactionString(transaction), err.Error())
}
Expand Down
66 changes: 29 additions & 37 deletions internal/p2p/peer_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ import (
"github.com/multiformats/go-multihash"
)

type signalRequestBlocks struct{}

// PeerConnection handles the sync portion of a connection to a peer
type PeerConnection struct {
id peer.ID
version *semver.Version
isSynced bool
opts *options.PeerConnectionOptions

requestBlockChan chan signalRequestBlocks

libProvider LastIrreversibleBlockProvider
localRPC rpc.LocalRPC
peerRPC rpc.RemoteRPC
Expand All @@ -35,10 +31,6 @@ type PeerConnection struct {
versionProvider ProtocolVersionProvider
}

func (p *PeerConnection) requestBlocks() {
p.requestBlockChan <- signalRequestBlocks{}
}

func (p *PeerConnection) handshake(ctx context.Context) error {
// Check Peer's protocol version
version, err := p.versionProvider.GetProtocolVersion(ctx, p.id)
Expand Down Expand Up @@ -97,7 +89,7 @@ func (p *PeerConnection) handshake(ctx context.Context) error {
return nil
}

func (p *PeerConnection) handleRequestBlocks(ctx context.Context) error {
func (p *PeerConnection) requestSyncBlocks(ctx context.Context) error {
// Get my last irreversible block
lib := p.libProvider.GetLastIrreversibleBlock()

Expand Down Expand Up @@ -200,26 +192,28 @@ func (p *PeerConnection) handleRequestBlocks(ctx context.Context) error {

func (p *PeerConnection) connectionLoop(ctx context.Context) {
for {
// Request sync blocks.
// If there is an error, report it
err := p.requestSyncBlocks(ctx)
if err != nil {
select {
case p.peerErrorChan <- PeerError{id: p.id, err: err}:
case <-ctx.Done():
return
}
}

// Get sleep time if we are synced or not
sleepTime := time.Second
if p.isSynced {
sleepTime = p.opts.SyncedSleepTime
}

// Sleep and then repeat
select {
case <-time.After(sleepTime):
case <-ctx.Done():
return
case <-p.requestBlockChan:
err := p.handleRequestBlocks(ctx)
if err != nil {
go time.AfterFunc(time.Second, p.requestBlocks)
go func() {
select {
case p.peerErrorChan <- PeerError{id: p.id, err: err}:
case <-ctx.Done():
}
}()
} else {
if p.isSynced {
go time.AfterFunc(p.opts.SyncedPingTime, p.requestBlocks)
} else {
go p.requestBlocks()
}
}
}
}
}
Expand All @@ -240,7 +234,6 @@ func (p *PeerConnection) Start(ctx context.Context) {
}()
} else {
go p.connectionLoop(ctx)
go p.requestBlocks()
return
}
select {
Expand All @@ -263,15 +256,14 @@ func NewPeerConnection(
applicator *Applicator,
versionProvider ProtocolVersionProvider) *PeerConnection {
return &PeerConnection{
id: id,
isSynced: false,
opts: opts,
requestBlockChan: make(chan signalRequestBlocks),
libProvider: libProvider,
localRPC: localRPC,
peerRPC: peerRPC,
applicator: applicator,
peerErrorChan: peerErrorChan,
versionProvider: versionProvider,
id: id,
isSynced: false,
opts: opts,
libProvider: libProvider,
localRPC: localRPC,
peerRPC: peerRPC,
applicator: applicator,
peerErrorChan: peerErrorChan,
versionProvider: versionProvider,
}
}