From ab938bfc22127b39dd5bfe64e478274d7c9ea066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Tue, 27 Feb 2024 21:42:18 +0100 Subject: [PATCH] Refactor MemoryLeakTestSuite as we identified two issues where the goroutine count from before differs from after the test. 1) It seemed like a Go runtime specific Goroutine appeared in rare cases before the test. To avoid this, we introduced a short timeout before looking up the Goroutines. Another solution might be to do the lookup twice and check if the count matches. 2) A Goroutine that periodically monitors some storage unexpectedly got closed in rare cases. As we could not identify the cause for this, we removed the leaking Goroutines by properly cleaning up. --- cmd/poseidon/main_test.go | 4 +-- internal/api/environments_test.go | 25 ++++++++++++---- internal/api/websocket_test.go | 18 ++++++++---- internal/environment/nomad_manager_test.go | 4 +-- internal/runner/nomad_manager_test.go | 4 +-- tests/util.go | 34 ++++++++++++---------- 6 files changed, 56 insertions(+), 33 deletions(-) diff --git a/cmd/poseidon/main_test.go b/cmd/poseidon/main_test.go index 7f28362b..60e1c0d0 100644 --- a/cmd/poseidon/main_test.go +++ b/cmd/poseidon/main_test.go @@ -51,8 +51,8 @@ func (s *MainTestSuite) TestShutdownOnOSSignal_Profiling() { disableRecovery, cancel := context.WithCancel(context.Background()) cancel() - s.ExpectedGoroutingIncrease++ // The shutdownOnOSSignal waits for an exit after stopping the profiling. - s.ExpectedGoroutingIncrease++ // The shutdownOnOSSignal triggers a os.Signal Goroutine. + s.ExpectedGoroutineIncrease++ // The shutdownOnOSSignal waits for an exit after stopping the profiling. + s.ExpectedGoroutineIncrease++ // The shutdownOnOSSignal triggers a os.Signal Goroutine. server := initServer(initRouter(disableRecovery)) go shutdownOnOSSignal(server, context.Background(), func() { diff --git a/internal/api/environments_test.go b/internal/api/environments_test.go index 3c7ccbdd..d49d3929 100644 --- a/internal/api/environments_test.go +++ b/internal/api/environments_test.go @@ -90,14 +90,16 @@ func (s *EnvironmentControllerTestSuite) TestList() { }) s.Run("returns multiple environments", func() { - s.ExpectedGoroutingIncrease++ // We dont care to delete the created environment. - s.ExpectedGoroutingIncrease++ // Also not about the second. + apiMock := &nomad.ExecutorAPIMock{} + apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + var firstEnvironment, secondEnvironment *environment.NomadEnvironment call.Run(func(args mock.Arguments) { - firstEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, + firstEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock, fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) - secondEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, + secondEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock, fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) call.ReturnArguments = mock.Arguments{[]runner.ExecutionEnvironment{firstEnvironment, secondEnvironment}, nil} @@ -114,6 +116,11 @@ func (s *EnvironmentControllerTestSuite) TestList() { environments, ok := environmentsInterface.([]interface{}) s.Require().True(ok) s.Equal(2, len(environments)) + + err = firstEnvironment.Delete(tests.ErrCleanupDestroyReason) + s.NoError(err) + err = secondEnvironment.Delete(tests.ErrCleanupDestroyReason) + s.NoError(err) }) } @@ -155,10 +162,13 @@ func (s *EnvironmentControllerTestSuite) TestGet() { s.manager.Calls = []mock.Call{} s.Run("returns environment", func() { - s.ExpectedGoroutingIncrease++ // We dont care to delete the created environment. + apiMock := &nomad.ExecutorAPIMock{} + apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + var testEnvironment *environment.NomadEnvironment call.Run(func(args mock.Arguments) { - testEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, + testEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock, fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) call.ReturnArguments = mock.Arguments{testEnvironment, nil} @@ -176,6 +186,9 @@ func (s *EnvironmentControllerTestSuite) TestGet() { idFloat, ok := idInterface.(float64) s.Require().True(ok) s.Equal(tests.DefaultEnvironmentIDAsInteger, int(idFloat)) + + err = testEnvironment.Delete(tests.ErrCleanupDestroyReason) + s.NoError(err) }) } diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 29191d53..76274977 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -294,7 +294,8 @@ func (s *MainTestSuite) TestWebsocketTLS() { func (s *MainTestSuite) TestWebSocketProxyStopsReadingTheWebSocketAfterClosingIt() { apiMock := &nomad.ExecutorAPIMock{} executionID := tests.DefaultExecutionID - r, wsURL := newRunnerWithNotMockedRunnerManager(s, apiMock, executionID) + r, wsURL, cleanup := newRunnerWithNotMockedRunnerManager(s, apiMock, executionID) + defer cleanup() logger, hook := test.NewNullLogger() log = logger.WithField("pkg", "api") @@ -329,9 +330,10 @@ func newNomadAllocationWithMockedAPIClient(runnerID string) (runner.Runner, *nom } func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.ExecutorAPIMock, executionID string) ( - r runner.Runner, wsURL *url.URL) { + r runner.Runner, wsURL *url.URL, cleanup func()) { s.T().Helper() apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil) + apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil) call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything) @@ -342,13 +344,11 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx) router := NewRouter(runnerManager, nil) - s.ExpectedGoroutingIncrease++ // We don't care about closing the server at this point. + s.ExpectedGoroutineIncrease++ // The server is not closing properly. Therefore, we don't even try. server := httptest.NewServer(router) runnerID := tests.DefaultRunnerID - s.ExpectedGoroutingIncrease++ // We don't care about removing the runner at this place. runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, nil) - s.ExpectedGoroutingIncrease++ // We don't care about removing the environment at this place. e, err := environment.NewNomadEnvironment(0, apiMock, "job \"template-0\" {}") s.Require().NoError(err) eID, err := nomad.EnvironmentIDFromRunnerID(runnerID) @@ -362,7 +362,13 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut s.Require().NoError(err) wsURL, err = webSocketURL("ws", server, router, r.ID(), executionID) s.Require().NoError(err) - return r, wsURL + + return r, wsURL, func() { + err = r.Destroy(tests.ErrCleanupDestroyReason) + s.NoError(err) + err = e.Delete(tests.ErrCleanupDestroyReason) + s.NoError(err) + } } func webSocketURL(scheme string, server *httptest.Server, router *mux.Router, diff --git a/internal/environment/nomad_manager_test.go b/internal/environment/nomad_manager_test.go index aefcd5e4..040f73a6 100644 --- a/internal/environment/nomad_manager_test.go +++ b/internal/environment/nomad_manager_test.go @@ -59,7 +59,7 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) - s.ExpectedGoroutingIncrease++ // We don't care about removing the created environment. + s.ExpectedGoroutineIncrease++ // We don't care about removing the created environment. _, err := s.manager.CreateOrUpdate( dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background()) s.ErrorIs(err, tests.ErrDefault) @@ -89,7 +89,7 @@ func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() { call.ReturnArguments = mock.Arguments{nil} }) - s.ExpectedGoroutingIncrease++ // We dont care about removing the created environment at this point. + s.ExpectedGoroutineIncrease++ // We dont care about removing the created environment at this point. _, err := s.manager.CreateOrUpdate( dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background()) s.NoError(err) diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 8e3d515f..6b877acf 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -528,7 +528,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { jobID := tests.DefaultRunnerID job.ID = &jobID job.Name = &jobID - s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. + s.ExpectedGoroutineIncrease++ // We dont care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) runnerManager.Load() @@ -544,7 +544,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) s.Require().NotNil(configTaskGroup) configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue - s.ExpectedGoroutingIncrease++ // We don't care about destroying the created runner. + s.ExpectedGoroutineIncrease++ // We don't care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) s.Require().Zero(runnerManager.usedRunners.Length()) diff --git a/tests/util.go b/tests/util.go index 5e99c4bc..4d8030fc 100644 --- a/tests/util.go +++ b/tests/util.go @@ -31,31 +31,35 @@ var numGoroutines = regexp.MustCompile(`^goroutine profile: total (\d*)\n`) // Be aware not to overwrite the SetupTest or TearDownTest function! type MemoryLeakTestSuite struct { suite.Suite - ExpectedGoroutingIncrease int + ExpectedGoroutineIncrease int TestCtx context.Context testCtxCancel context.CancelFunc goroutineCountBefore int goroutinesBefore *bytes.Buffer } -func (s *MemoryLeakTestSuite) SetupTest() { - // Without this first line we observed some goroutines just closing. - runtime.Gosched() - s.ExpectedGoroutingIncrease = 0 - s.goroutinesBefore = &bytes.Buffer{} - - err := pprof.Lookup("goroutine").WriteTo(s.goroutinesBefore, 1) +func (s *MemoryLeakTestSuite) lookupGoroutines() (debugOutput *bytes.Buffer, goroutineCount int) { + debugOutput = &bytes.Buffer{} + err := pprof.Lookup("goroutine").WriteTo(debugOutput, 1) s.Require().NoError(err) - match := numGoroutines.FindSubmatch(s.goroutinesBefore.Bytes()) + match := numGoroutines.FindSubmatch(debugOutput.Bytes()) if match == nil { - s.Fail("gouroutines could not be parsed: " + s.goroutinesBefore.String()) + s.Fail("gouroutines could not be parsed: " + debugOutput.String()) } // We do not use runtime.NumGoroutine() to not create inconsistency to the Lookup. - s.goroutineCountBefore, err = strconv.Atoi(string(match[1])) + goroutineCount, err = strconv.Atoi(string(match[1])) if err != nil { s.Fail("number of goroutines could not be parsed: " + err.Error()) } + return debugOutput, goroutineCount +} + +func (s *MemoryLeakTestSuite) SetupTest() { + runtime.Gosched() // Flush done Goroutines + <-time.After(ShortTimeout) // Just to make sure + s.ExpectedGoroutineIncrease = 0 + s.goroutinesBefore, s.goroutineCountBefore = s.lookupGoroutines() ctx, cancel := context.WithCancel(context.Background()) s.TestCtx = ctx @@ -66,13 +70,13 @@ func (s *MemoryLeakTestSuite) TearDownTest() { s.testCtxCancel() runtime.Gosched() // Flush done Goroutines <-time.After(ShortTimeout) // Just to make sure - goroutinesAfter := runtime.NumGoroutine() - s.Equal(s.goroutineCountBefore+s.ExpectedGoroutingIncrease, goroutinesAfter) - if s.goroutineCountBefore+s.ExpectedGoroutingIncrease != goroutinesAfter { + goroutinesAfter, goroutineCountAfter := s.lookupGoroutines() + s.Equal(s.goroutineCountBefore+s.ExpectedGoroutineIncrease, goroutineCountAfter) + if s.goroutineCountBefore+s.ExpectedGoroutineIncrease != goroutineCountAfter { _, err := io.Copy(os.Stderr, s.goroutinesBefore) s.NoError(err) - err = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) + _, err = io.Copy(os.Stderr, goroutinesAfter) s.NoError(err) } }