Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: multipeer: fix edge cases #6447

Open
wants to merge 2 commits into
base: sync2/dbset-conns
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"errors"
"fmt"
"math"
"math/rand/v2"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -64,6 +66,9 @@
MinCompleteFraction float64 `mapstructure:"min-complete-fraction"`
// Interval between syncs.
SyncInterval time.Duration `mapstructure:"sync-interval"`
// Interval spread factor for split sync.
// The actual interval will be SyncInterval * (1 + (random[0..2]*SplitSyncIntervalSpread-1)).
SyncIntervalSpread float64 `mapstructure:"sync-interval-spread"`
// Interval between retries after a failed sync.
RetryInterval time.Duration `mapstructure:"retry-interval"`
// Interval between rechecking for peers after no synchronization peers were
Expand Down Expand Up @@ -91,6 +96,7 @@
MaxFullDiff: 10000,
MaxSyncDiff: 100,
SyncInterval: 5 * time.Minute,
SyncIntervalSpread: 0.5,
RetryInterval: 1 * time.Minute,
NoPeersRecheckInterval: 30 * time.Second,
SplitSyncGracePeriod: time.Minute,
Expand Down Expand Up @@ -259,20 +265,27 @@
}

func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Peer) error {
if len(syncPeers) == 0 {
return errors.New("no peers to sync against")
}

Check warning on line 270 in sync2/multipeer/multipeer.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/multipeer.go#L269-L270

Added lines #L269 - L270 were not covered by tests
var eg errgroup.Group
var someSucceeded atomic.Bool
for _, p := range syncPeers {
eg.Go(func() error {
if err := mpr.syncBase.WithPeerSyncer(ctx, p, func(ps PeerSyncer) error {
err := ps.Sync(ctx, nil, nil)
switch {
case err == nil:
someSucceeded.Store(true)
mpr.sl.NoteSync()
case errors.Is(err, context.Canceled):
return err
default:
// failing to sync against a particular peer is not considered
// a fatal sync failure, so we just log the error
mpr.logger.Error("error syncing peer", zap.Stringer("peer", p), zap.Error(err))
mpr.logger.Error("error syncing peer",
zap.Stringer("peer", p),
zap.Error(err))
}
return nil
}); err != nil {
Expand All @@ -281,7 +294,13 @@
return nil
})
}
return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}

Check warning on line 299 in sync2/multipeer/multipeer.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/multipeer.go#L298-L299

Added lines #L298 - L299 were not covered by tests
if !someSucceeded.Load() {
return errors.New("all syncs failed")
}
return nil
}

func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) {
Expand Down Expand Up @@ -341,7 +360,7 @@
}

// Run runs the MultiPeerReconciler.
func (mpr *MultiPeerReconciler) Run(ctx context.Context) error {
func (mpr *MultiPeerReconciler) Run(ctx context.Context, kickCh chan struct{}) error {
// The point of using split sync, which syncs different key ranges against
// different peers, vs full sync which syncs the full key range against different
// peers, is:
Expand Down Expand Up @@ -379,7 +398,9 @@
lastWasSplit := false
LOOP:
for {
interval := mpr.cfg.SyncInterval
interval := time.Duration(
float64(mpr.cfg.SyncInterval) *
(1 + mpr.cfg.SyncIntervalSpread*(rand.Float64()*2-1)))
full, err = mpr.syncOnce(ctx, lastWasSplit)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -402,6 +423,7 @@
err = ctx.Err()
break LOOP
case <-mpr.clock.After(interval):
case <-kickCh:
}
}
// The loop is only exited upon context cancellation.
Expand Down
64 changes: 61 additions & 3 deletions sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type multiPeerSyncTester struct {
reconciler *multipeer.MultiPeerReconciler
cancel context.CancelFunc
eg errgroup.Group
kickCh chan struct{}
// EXPECT() calls should not be done concurrently
// https://github.com/golang/mock/issues/533#issuecomment-821537840
mtx sync.Mutex
Expand All @@ -80,10 +81,13 @@ func newMultiPeerSyncTester(t *testing.T, addPeers int) *multiPeerSyncTester {
syncRunner: NewMocksyncRunner(ctrl),
peers: peers.New(),
clock: clockwork.NewFakeClock().(fakeClock),
kickCh: make(chan struct{}, 1),
}
cfg := multipeer.DefaultConfig()
cfg.SyncInterval = time.Minute
cfg.SyncInterval = 40 * time.Second
cfg.SyncIntervalSpread = 0.1
cfg.SyncPeerCount = numSyncPeers
cfg.RetryInterval = 5 * time.Second
cfg.MinSplitSyncPeers = 2
cfg.MinSplitSyncCount = 90
cfg.MaxFullDiff = 20
Expand All @@ -110,7 +114,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer {
func (mt *multiPeerSyncTester) start() context.Context {
var ctx context.Context
ctx, mt.cancel = context.WithTimeout(context.Background(), 10*time.Second)
mt.eg.Go(func() error { return mt.reconciler.Run(ctx) })
mt.eg.Go(func() error { return mt.reconciler.Run(ctx, mt.kickCh) })
mt.Cleanup(func() {
mt.cancel()
if err := mt.eg.Wait(); err != nil {
Expand All @@ -120,6 +124,10 @@ func (mt *multiPeerSyncTester) start() context.Context {
return ctx
}

func (mt *multiPeerSyncTester) kick() {
mt.kickCh <- struct{}{}
}

func (mt *multiPeerSyncTester) expectProbe(times int, pr rangesync.ProbeResult) *peerList {
var pl peerList
mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -182,7 +190,7 @@ func TestMultiPeerSync(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
// Advance by sync interval. No peers yet
// Advance by sync interval (incl. spread). No peers yet
mt.clock.Advance(time.Minute)
mt.clock.BlockUntilContext(ctx, 1)
// It is safe to do EXPECT() calls while the MultiPeerReconciler is blocked
Expand Down Expand Up @@ -248,6 +256,34 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("sync after kick", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.99, // high enough for full sync
})
mt.expectFullSync(pl, numSyncPeers, 0)
mt.syncBase.EXPECT().Wait()
}
expect()
// first full sync happens immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
for i := 0; i < numSyncs; i++ {
expect()
mt.kick()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
}
require.True(t, mt.reconciler.Synced())
mt.syncBase.EXPECT().Wait()
})

t.Run("full sync, peers with low count ignored", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
addedPeers := mt.addPeers(numSyncPeers)
Expand Down Expand Up @@ -348,6 +384,28 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("all peers failed during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()

pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, numSyncPeers)
mt.syncBase.EXPECT().Wait().AnyTimes()

ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()

pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, 0)
// Retry should happen after mere 5 seconds as no peers have succeeded, no
// need to wait full sync interval.
mt.clock.Advance(5 * time.Second)
mt.satisfy()

require.True(t, mt.reconciler.Synced())
})

t.Run("failed synced key handling during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
Expand Down
30 changes: 27 additions & 3 deletions sync2/multipeer/split_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
)

Expand Down Expand Up @@ -150,6 +151,8 @@
s.syncPeers = append(s.syncPeers, r.ps.Peer())
s.numRemaining--
s.logger.Debug("peer synced successfully",
log.ZShortStringer("x", sr.X),
log.ZShortStringer("y", sr.Y),
zap.Stringer("peer", r.ps.Peer()),
zap.Int("numPeers", s.numPeers),
zap.Int("numRemaining", s.numRemaining),
Expand Down Expand Up @@ -199,8 +202,18 @@
}
select {
case sr = <-s.slowRangeCh:
// push this syncRange to the back of the queue
s.sq.Update(sr, s.clock.Now())
// Push this syncRange to the back of the queue.
// There's some chance that the peer managed to complete
// the sync while the range was still sitting in the
// channel, so we double-check if it's done.
if !sr.Done {
s.logger.Debug("slow peer, reassigning the range",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
s.sq.Update(sr, s.clock.Now())
} else {
s.logger.Debug("slow peer, NOT reassigning the range: DONE",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
}

Check warning on line 216 in sync2/multipeer/split_sync.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/split_sync.go#L214-L216

Added lines #L214 - L216 were not covered by tests
case <-syncCtx.Done():
return syncCtx.Err()
case r := <-s.resCh:
Expand All @@ -210,5 +223,16 @@
}
}
}
return s.eg.Wait()
// Stop late peers that didn't manage to sync their ranges in time.
// The ranges were already reassigned to other peers and successfully
// synced by this point.
cancel()
err := s.eg.Wait()
if s.numRemaining == 0 {
// If all the ranges are synced, the split sync is considered successful
// even if some peers failed to sync their ranges, so that these ranges
// got synced by other peers.
return nil
}
return err

Check warning on line 237 in sync2/multipeer/split_sync.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/split_sync.go#L237

Added line #L237 was not covered by tests
}
Loading
Loading