From 9a2dfe638ac663b3b3161139737360ccf77bfbf2 Mon Sep 17 00:00:00 2001 From: Hunk Zhu Date: Fri, 27 Oct 2023 06:39:43 +0000 Subject: [PATCH] fix issue 3100 --- contrib/registry/nacos/nacos_watcher.go | 21 +++++---------------- contrib/registry/nacos/nacos_z_test.go | 22 +++++++++++++++++----- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/contrib/registry/nacos/nacos_watcher.go b/contrib/registry/nacos/nacos_watcher.go index 782e1dc9643..ba08596f312 100644 --- a/contrib/registry/nacos/nacos_watcher.go +++ b/contrib/registry/nacos/nacos_watcher.go @@ -38,23 +38,12 @@ func newWatcher(ctx context.Context) *Watcher { // Proceed proceeds watch in blocking way. // It returns all complete services that watched by `key` if any change. func (w *Watcher) Proceed() (services []gsvc.Service, err error) { - n := len(w.event) - servicesMap := map[string]gsvc.Service{} - for i := 0; i < n; i++ { - e := <-w.event - if e.Err != nil { - err = e.Err - return - } - newServices := NewServicesFromInstances(e.Services) - for _, s := range newServices { - servicesMap[s.GetName()] = s - } - } - services = make([]gsvc.Service, 0, len(servicesMap)) - for _, s := range servicesMap { - services = append(services, s) + e := <-w.event + if e.Err != nil { + err = e.Err + return } + services = NewServicesFromInstances(e.Services) return } diff --git a/contrib/registry/nacos/nacos_z_test.go b/contrib/registry/nacos/nacos_z_test.go index 8e5a7e1bc71..138a9a34f35 100644 --- a/contrib/registry/nacos/nacos_z_test.go +++ b/contrib/registry/nacos/nacos_z_test.go @@ -7,10 +7,13 @@ package nacos_test import ( + "context" + "sync/atomic" "testing" "time" "github.com/gogf/gf/contrib/registry/nacos/v2" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/test/gtest" @@ -121,9 +124,19 @@ func TestWatch(t *testing.T) { }) gtest.C(t, func(t *gtest.T) { + ctx := gctx.New() watcher, err := registry.Watch(ctx, svc1.GetPrefix()) t.AssertNil(err) + var latestProceedResult atomic.Value + g.Go(ctx, func(ctx context.Context) { + res, err := watcher.Proceed() + t.AssertNil(err) + latestProceedResult.Store(res) + }, func(ctx context.Context, exception error) { + t.Fatal(exception) + }) + // Register another service. svc2 := &gsvc.LocalService{ Name: svc1.Name, @@ -140,9 +153,8 @@ func TestWatch(t *testing.T) { // Watch and retrieve the service changes: // svc1 and svc2 is the same service name, which has 2 endpoints. - proceedResult, err := watcher.Proceed() - - t.AssertNil(err) + proceedResult, ok := latestProceedResult.Load().([]gsvc.Service) + t.Assert(ok, true) t.Assert(len(proceedResult), 1) t.Assert( sortEndpoints(proceedResult[0].GetEndpoints()), @@ -155,8 +167,8 @@ func TestWatch(t *testing.T) { t.AssertNil(err) time.Sleep(time.Second * 10) - proceedResult, err = watcher.Proceed() - t.AssertNil(err) + proceedResult, ok = latestProceedResult.Load().([]gsvc.Service) + t.Assert(ok, true) t.Assert(len(proceedResult), 1) t.Assert( sortEndpoints(proceedResult[0].GetEndpoints()),