diff --git a/test/infrastructure/inmemory/pkg/server/api/watch.go b/test/infrastructure/inmemory/pkg/server/api/watch.go index d6a99f5d8ae3..38e3f614e93d 100644 --- a/test/infrastructure/inmemory/pkg/server/api/watch.go +++ b/test/infrastructure/inmemory/pkg/server/api/watch.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "sort" "strconv" "time" @@ -112,55 +111,6 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful. return err } - // Get at client to the resource group and list all relevant objects. - inmemoryClient := h.manager.GetResourceGroup(resourceGroup).GetClient() - list, err := h.apiV1list(ctx, req, gvk, inmemoryClient) - if err != nil { - return err - } - - // Sort the objects by resourceVersion to later write the events in order. - sort.SliceStable(list.Items, func(i, j int) bool { - a, err := strconv.ParseUint(list.Items[i].GetResourceVersion(), 10, 64) - if err != nil { - panic(err) - } - b, err := strconv.ParseUint(list.Items[j].GetResourceVersion(), 10, 64) - if err != nil { - panic(err) - } - return a < b - }) - - initialEvents := []Event{} - - // If resourceVersion was not set as query parameter, use 0 to stream all old events. - // Note: This only works because the very first resource version for the in-memory apiserver is 1. - if resourceVersion == "" { - resourceVersion = "0" - } - - parsedResourceVersion, err := strconv.ParseUint(resourceVersion, 10, 64) - if err != nil { - return err - } - - // Loop over all items and fill the list of events with objects which have a newer resourceVersion. - for _, obj := range list.Items { - objResourceVersion, err := strconv.ParseUint(obj.GetResourceVersion(), 10, 64) - if err != nil { - return err - } - if objResourceVersion <= parsedResourceVersion { - continue - } - eventType := watch.Modified - if obj.GetGeneration() == 1 { - eventType = watch.Added - } - initialEvents = append(initialEvents, Event{Type: eventType, Object: &obj}) - } - // Defer cleanup which removes the event handler and ensures the channel is empty of events. defer func() { // Doing this to ensure the channel is empty. @@ -182,6 +132,43 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful. // Note: After we removed the handler, no new events will be written to the events channel. }() + // Get at client to the resource group and list all relevant objects. + inmemoryClient := h.manager.GetResourceGroup(resourceGroup).GetClient() + list, err := h.apiV1list(ctx, req, gvk, inmemoryClient) + if err != nil { + return err + } + + // If resourceVersion was set parse to uint64 which is the representation in the simulated apiserver. + var parsedResourceVersion uint64 + if resourceVersion != "" { + parsedResourceVersion, err = strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return err + } + } + + initialEvents := []Event{} + + // Loop over all items and fill the list of events with objects which have a newer resourceVersion. + for _, obj := range list.Items { + if resourceVersion != "" { + objResourceVersion, err := strconv.ParseUint(obj.GetResourceVersion(), 10, 64) + if err != nil { + return err + } + if objResourceVersion <= parsedResourceVersion { + continue + } + } + eventType := watch.Modified + // kube-apiserver emits all events as ADDED when no resourceVersion is given. + if obj.GetGeneration() == 1 || resourceVersion == "" { + eventType = watch.Added + } + initialEvents = append(initialEvents, Event{Type: eventType, Object: &obj}) + } + return watcher.Run(ctx, queryTimeout, initialEvents, resp) }