Skip to content

Commit

Permalink
cmapisrv-mock: wait for full synchronization before churn phase
Browse files Browse the repository at this point in the history
What for synchronization completion in all mocked clusters, and for all
resources before starting the churn phase, to avoid unnecessarily
consuming rate limiter slots before turning ready.

Signed-off-by: Marco Iorio <[email protected]>
  • Loading branch information
giorio94 committed Jul 4, 2024
1 parent 16a9e10 commit 40e2511
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
12 changes: 6 additions & 6 deletions cmapisrv-mock/internal/mocker/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cls clusters) Run(ctx context.Context, ss syncstate.SyncState) {
for _, cl := range cls.cls {
synced := ss.WaitForResource()
go func(cl cluster) {
cl.Run(ctx, cls.cfg, synced)
cl.Run(ctx, cls.cfg, synced, ss.WaitChannel())
wg.Done()
}(cl)
}
Expand Down Expand Up @@ -95,27 +95,27 @@ func newCluster(log logrus.FieldLogger, cp cparams) cluster {
return cl
}

func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Context)) {
func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Context), allSynced <-chan struct{}) {
var wg sync.WaitGroup

cl.log.Info("Starting cluster")
cl.writeClusterConfig(ctx)

wg.Add(1)
go func() {
cl.nodes.Run(ctx, cfg.Nodes, rate.Limit(cfg.NodesQPS))
cl.nodes.Run(ctx, cfg.Nodes, rate.Limit(cfg.NodesQPS), allSynced)
wg.Done()
}()

wg.Add(1)
go func() {
cl.identities.Run(ctx, cfg.Identities, rate.Limit(cfg.IdentitiesQPS))
cl.identities.Run(ctx, cfg.Identities, rate.Limit(cfg.IdentitiesQPS), allSynced)
wg.Done()
}()

wg.Add(1)
go func() {
cl.services.Run(ctx, cfg.Services, rate.Limit(cfg.ServicesQPS))
cl.services.Run(ctx, cfg.Services, rate.Limit(cfg.ServicesQPS), allSynced)
wg.Done()
}()

Expand All @@ -127,7 +127,7 @@ func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Cont
return
}

cl.endpoints.Run(ctx, cfg.Endpoints, rate.Limit(cfg.EndpointsQPS))
cl.endpoints.Run(ctx, cfg.Endpoints, rate.Limit(cfg.EndpointsQPS), allSynced)
}()

if cl.nodes.WaitForSync(ctx) != nil || cl.identities.WaitForSync(ctx) != nil ||
Expand Down
17 changes: 12 additions & 5 deletions cmapisrv-mock/internal/mocker/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newSyncer[T store.Key](log logrus.FieldLogger, typ string, store store.Sync
}
}

func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit) {
func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit, allSynced <-chan struct{}) {
s.log.Info("Starting synchronization")
do := func(obj T, delete bool) {
if delete {
Expand Down Expand Up @@ -59,13 +59,20 @@ func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit) {
do(s.next(false))
}

s.store.Synced(ctx, func(context.Context) { close(s.init) })
if err := s.WaitForSync(ctx); err != nil {
s.store.Synced(ctx, func(context.Context) {
s.log.Info("Initial synchronization completed")
close(s.init)
})

select {
case <-ctx.Done():
return
case <-allSynced:
// Wait for synchronization completion in all mocked clusters and for
// all resources before starting the churn phase, to avoid unnecessarily
// consuming rate limiter slots before turning ready.
}

s.log.Info("Initial synchronization completed")

rl := rate.NewLimiter(qps, 1)
for {
if err := rl.Wait(ctx); err != nil {
Expand Down

0 comments on commit 40e2511

Please sign in to comment.