From 40e2511dab81983ec78b9b3eff84500d585b302e Mon Sep 17 00:00:00 2001 From: Marco Iorio Date: Wed, 3 Jul 2024 17:27:43 +0200 Subject: [PATCH] cmapisrv-mock: wait for full synchronization before churn phase What for synchronization completion in all mocked clusters, and for all resources before starting the churn phase, to avoid unnecessarily consuming rate limiter slots before turning ready. Signed-off-by: Marco Iorio --- cmapisrv-mock/internal/mocker/cluster.go | 12 ++++++------ cmapisrv-mock/internal/mocker/syncer.go | 17 ++++++++++++----- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cmapisrv-mock/internal/mocker/cluster.go b/cmapisrv-mock/internal/mocker/cluster.go index 6c846920..bc3c49fe 100644 --- a/cmapisrv-mock/internal/mocker/cluster.go +++ b/cmapisrv-mock/internal/mocker/cluster.go @@ -51,7 +51,7 @@ func (cls clusters) Run(ctx context.Context, ss syncstate.SyncState) { for _, cl := range cls.cls { synced := ss.WaitForResource() go func(cl cluster) { - cl.Run(ctx, cls.cfg, synced) + cl.Run(ctx, cls.cfg, synced, ss.WaitChannel()) wg.Done() }(cl) } @@ -95,7 +95,7 @@ func newCluster(log logrus.FieldLogger, cp cparams) cluster { return cl } -func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Context)) { +func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Context), allSynced <-chan struct{}) { var wg sync.WaitGroup cl.log.Info("Starting cluster") @@ -103,19 +103,19 @@ func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Cont wg.Add(1) go func() { - cl.nodes.Run(ctx, cfg.Nodes, rate.Limit(cfg.NodesQPS)) + cl.nodes.Run(ctx, cfg.Nodes, rate.Limit(cfg.NodesQPS), allSynced) wg.Done() }() wg.Add(1) go func() { - cl.identities.Run(ctx, cfg.Identities, rate.Limit(cfg.IdentitiesQPS)) + cl.identities.Run(ctx, cfg.Identities, rate.Limit(cfg.IdentitiesQPS), allSynced) wg.Done() }() wg.Add(1) go func() { - cl.services.Run(ctx, cfg.Services, rate.Limit(cfg.ServicesQPS)) + cl.services.Run(ctx, cfg.Services, rate.Limit(cfg.ServicesQPS), allSynced) wg.Done() }() @@ -127,7 +127,7 @@ func (cl *cluster) Run(ctx context.Context, cfg config, synced func(context.Cont return } - cl.endpoints.Run(ctx, cfg.Endpoints, rate.Limit(cfg.EndpointsQPS)) + cl.endpoints.Run(ctx, cfg.Endpoints, rate.Limit(cfg.EndpointsQPS), allSynced) }() if cl.nodes.WaitForSync(ctx) != nil || cl.identities.WaitForSync(ctx) != nil || diff --git a/cmapisrv-mock/internal/mocker/syncer.go b/cmapisrv-mock/internal/mocker/syncer.go index e538e599..8f8e6852 100644 --- a/cmapisrv-mock/internal/mocker/syncer.go +++ b/cmapisrv-mock/internal/mocker/syncer.go @@ -31,7 +31,7 @@ func newSyncer[T store.Key](log logrus.FieldLogger, typ string, store store.Sync } } -func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit) { +func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit, allSynced <-chan struct{}) { s.log.Info("Starting synchronization") do := func(obj T, delete bool) { if delete { @@ -59,13 +59,20 @@ func (s syncer[T]) Run(ctx context.Context, target uint, qps rate.Limit) { do(s.next(false)) } - s.store.Synced(ctx, func(context.Context) { close(s.init) }) - if err := s.WaitForSync(ctx); err != nil { + s.store.Synced(ctx, func(context.Context) { + s.log.Info("Initial synchronization completed") + close(s.init) + }) + + select { + case <-ctx.Done(): return + case <-allSynced: + // Wait for synchronization completion in all mocked clusters and for + // all resources before starting the churn phase, to avoid unnecessarily + // consuming rate limiter slots before turning ready. } - s.log.Info("Initial synchronization completed") - rl := rate.NewLimiter(qps, 1) for { if err := rl.Wait(ctx); err != nil {