Skip to content

Commit

Permalink
Terminally exit on unrecoverable exceptions for RCI.
Browse files Browse the repository at this point in the history
  • Loading branch information
BinBin He committed Jan 1, 2025
1 parent 41d593c commit 25e8e7a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 23 deletions.
43 changes: 33 additions & 10 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
// Register the container instance
err = agent.registerContainerInstance(client, vpcSubnetAttributes)
if err != nil {
if isTransient(err) {
return exitcodes.ExitError
if isTerminal(err) {
// On unrecoverable error codes, agent should terminally exit.
logger.Critical("Agent will terminally exit, unable to register container instance:", logger.Fields{
field.Error: err,
})
return exitcodes.ExitTerminal
}
return exitcodes.ExitTerminal
// Other errors are considered recoverable and will be retried.
return exitcodes.ExitError
}

// Load Managed Daemon images asynchronously
Expand Down Expand Up @@ -834,13 +839,19 @@ func (agent *ecsAgent) registerContainerInstance(
field.Error: err,
})
if retriable, ok := err.(apierrors.Retriable); ok && !retriable.Retry() {
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -850,9 +861,9 @@ func (agent *ecsAgent) registerContainerInstance(
logger.Critical("Instance registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}
logger.Info("Instance registration completed successfully", logger.Fields{
"instanceArn": containerInstanceArn,
Expand Down Expand Up @@ -882,7 +893,19 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
})
if apierrors.IsInstanceTypeChangedError(err) {
seelog.Criticalf(instanceTypeMismatchErrorFormat, err)
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance re-registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance re-registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -892,9 +915,9 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
logger.Critical("Instance re-registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}

// startAsyncRoutines starts all background methods
Expand Down
122 changes: 115 additions & 7 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func TestReregisterContainerInstanceInstanceTypeChanged(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1204,7 +1204,7 @@ func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetHappyPath(t *testing.T) {
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCanRetryError(

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryError(t *testing.T) {
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryErr

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1499,6 +1499,114 @@ func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
credentialsManager, state, imageManager, client, execCmdMgr)
assert.Equal(t, exitcodes.ExitTerminal, exitCode)
}

func TestRegisterContainerInstanceExceptionErrors(t *testing.T) {
testCases := []struct {
name string
regError error
exitCode int
}{
{
name: "InvalidParameterException",
regError: awserr.New("InvalidParameterException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ClientException",
regError: awserr.New("ClientException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ThrottlingException",
regError: awserr.New("ThrottlingException", "", nil),
exitCode: exitcodes.ExitError,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl, credentialsManager, state, imageManager, client,
dockerClient, _, _, execCmdMgr, _ := setup(t)
defer ctrl.Finish()

mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

mockEC2Metadata.EXPECT().PrimaryENIMAC().Return("mac", nil)
mockEC2Metadata.EXPECT().VPCID("mac").Return("vpc-id", nil)
mockEC2Metadata.EXPECT().SubnetID("mac").Return("subnet-id", nil)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)

mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

dockerClient.EXPECT().SupportedVersions().Return(apiVersions).AnyTimes()

gomock.InOrder(
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve(gomock.Any()).Return(awsv2.Credentials{}, nil),
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes().Return([]string{}, nil),

client.EXPECT().
RegisterContainerInstance(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).
Return("", "", tc.regError),
)

cfg := getTestConfig()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

agent := &ecsAgent{
ctx: ctx,
ec2MetadataClient: mockEC2Metadata,
cfg: &cfg,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
dockerClient: dockerClient,
mobyPlugins: mockMobyPlugins,
terminationHandler: func(
taskEngineState dockerstate.TaskEngineState,
dataClient data.Client,
taskEngine engine.TaskEngine,
cancel context.CancelFunc,
) {
},
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
}

exitCode := agent.doStart(
eventstream.NewEventStream("events", ctx),
credentialsManager,
state,
imageManager,
client,
execCmdMgr,
)

assert.Equal(t, tc.exitCode, exitCode)
})
}
}

func TestMergeTags(t *testing.T) {
ec2Key := "ec2Key"
ec2Value := "ec2Value"
Expand Down
13 changes: 7 additions & 6 deletions agent/app/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@

package app

// transientError represents a transient error when executing the ECS Agent
type transientError struct {
type terminalError struct {
error
}

// isTransient returns true if the error is transient
func isTransient(err error) bool {
_, ok := err.(transientError)
return ok
// isTerminal returns true if the error is already wrapped as an unrecoverable condition
// which will allow agent to exit terminally.
func isTerminal(err error) bool {
// Check if the error is already wrapped as a terminalError
_, terminal := err.(terminalError)
return terminal
}

// clusterMismatchError represents a mismatch in cluster name between the
Expand Down

0 comments on commit 25e8e7a

Please sign in to comment.