Skip to content

Commit

Permalink
Addressing TODOs for NodeGetVolumeMetrics and NodeUnstageVolume for E…
Browse files Browse the repository at this point in the history
…BS-backed tasks
  • Loading branch information
mye956 committed Oct 25, 2023
1 parent df18404 commit c15a7a8
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 71 deletions.
18 changes: 8 additions & 10 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (mtask *managedTask) overseeTask() {

if mtask.Task.IsEBSTaskAttachEnabled() {
csiClient := csiclient.NewCSIClient(filepath.Join(csiclient.DefaultSocketHostPath, csiclient.DefaultImageName, csiclient.DefaultSocketName))
err := mtask.UnstageVolumes(&csiClient)
if err != nil {
errors := mtask.UnstageVolumes(&csiClient)
for _, err := range errors {
logger.Error(fmt.Sprintf("Unable to unstage volumes: %v", err))
}
}
Expand Down Expand Up @@ -1573,11 +1573,12 @@ func (mtask *managedTask) waitForStopReported() bool {
return taskStopped
}

// TODO: Add unit test for UnstageVolumes in the near future
func (mtask *managedTask) UnstageVolumes(csiClient csiclient.CSIClient) error {
func (mtask *managedTask) UnstageVolumes(csiClient csiclient.CSIClient) []error {
errors := make([]error, 0)
task := mtask.Task
if task == nil {
return fmt.Errorf("managed task is nil")
errors = append(errors, fmt.Errorf("managed task is nil"))
return errors
}
if !task.IsEBSTaskAttachEnabled() {
logger.Debug("Task is not EBS-backed. Skip NodeUnstageVolume.")
Expand All @@ -1591,15 +1592,12 @@ func (mtask *managedTask) UnstageVolumes(csiClient csiclient.CSIClient) error {
hostPath := ebsCfg.Source()
err := mtask.unstageVolumeWithTimeout(csiClient, volumeId, hostPath)
if err != nil {
logger.Error("Unable to unstage volume", logger.Fields{
"Task": task.String(),
"Error": err,
})
errors = append(errors, fmt.Errorf("%w; unable to unstage volume for task %s", err, task.String()))
continue
}
}
}
return nil
return errors
}

func (mtask *managedTask) unstageVolumeWithTimeout(csiClient csiclient.CSIClient, volumeId, hostPath string) error {
Expand Down
63 changes: 63 additions & 0 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ import (
"github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes"
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks"
mock_csiclient "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient/mocks"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
ni "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks"
Expand Down Expand Up @@ -2227,3 +2230,63 @@ func TestTaskWaitForHostResources(t *testing.T) {
topTask, err = taskEngine.topTask()
assert.Error(t, err)
}

func TestUnstageVolumes(t *testing.T) {
tcs := []struct {
name string
err error
numErrors int
}{
{
name: "Success",
err: nil,
numErrors: 0,
},
{
name: "Failure",
err: errors.New("unable to unstage volume"),
numErrors: 1,
},
{
name: "TimeoutFailure",
err: errors.New("rpc error: code = DeadlineExceeded desc = context deadline exceeded"),
numErrors: 1,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
mtask := &managedTask{
Task: &apitask.Task{
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Volumes: []apitask.TaskVolume{
{
Name: taskresourcevolume.TestVolumeName,
Type: apiresource.EBSTaskAttach,
Volume: &taskresourcevolume.EBSTaskVolumeConfig{
VolumeId: taskresourcevolume.TestVolumeId,
VolumeName: taskresourcevolume.TestVolumeId,
VolumeSizeGib: taskresourcevolume.TestVolumeSizeGib,
SourceVolumeHostPath: taskresourcevolume.TestSourceVolumeHostPath,
DeviceName: taskresourcevolume.TestDeviceName,
FileSystem: taskresourcevolume.TestFileSystem,
},
},
},
},
ctx: ctx,
resourceStateChangeEvent: make(chan resourceStateChange),
}
mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl)
mockCsiClient.EXPECT().NodeUnstageVolume(gomock.Any(), "vol-12345", "/mnt/ecs/ebs/taskarn_vol-12345").Return(tc.err).Times(1)

errors := mtask.UnstageVolumes(mockCsiClient)
assert.Len(t, errors, tc.numErrors)
})
}
}
15 changes: 5 additions & 10 deletions agent/stats/engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ func (engine *DockerStatsEngine) getEBSVolumeMetrics(taskArn string) []*ecstcs.V
func (engine *DockerStatsEngine) fetchEBSVolumeMetrics(task *apitask.Task, taskArn string) []*ecstcs.VolumeMetric {
var metrics []*ecstcs.VolumeMetric
for _, tv := range task.Volumes {
// TODO: Include Getters within the TaskVolume interface so that we don't need to have these type casts.
// (i.e. getVolumeId())
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volumeId := ebsCfg.VolumeId
hostPath := ebsCfg.Source()
if tv.Volume.GetType() == taskresourcevolume.EBSVolumeType {
volumeId := tv.Volume.GetVolumeId()
hostPath := tv.Volume.Source()
volumeName := tv.Volume.GetVolumeName()
metric, err := engine.getVolumeMetricsWithTimeout(volumeId, hostPath)
if err != nil {
logger.Error("Failed to gather metrics for EBS volume", logger.Fields{
Expand All @@ -77,7 +74,7 @@ func (engine *DockerStatsEngine) fetchEBSVolumeMetrics(task *apitask.Task, taskA
totalBytes := aws.Float64((float64)(metric.Capacity))
metrics = append(metrics, &ecstcs.VolumeMetric{
VolumeId: aws.String(volumeId),
VolumeName: aws.String(ebsCfg.VolumeName),
VolumeName: aws.String(volumeName),
Utilized: &ecstcs.UDoubleCWStatsSet{
Max: usedBytes,
Min: usedBytes,
Expand All @@ -91,8 +88,6 @@ func (engine *DockerStatsEngine) fetchEBSVolumeMetrics(task *apitask.Task, taskA
Sum: totalBytes,
},
})
default:
continue
}
}
return metrics
Expand Down
138 changes: 87 additions & 51 deletions agent/stats/engine_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package stats

import (
"context"
"errors"
"testing"

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
Expand All @@ -30,6 +31,7 @@ import (
apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
mock_csiclient "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient/mocks"
ni "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -172,66 +174,100 @@ func TestServiceConnectWithDisabledMetrics(t *testing.T) {
assert.Len(t, engine.taskToServiceConnectStats, 1)
}

// TODO: Add a unhappy case in the near future
func TestFetchEBSVolumeMetrics(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
resolver := mock_resolver.NewMockContainerMetadataResolver(mockCtrl)
mockDockerClient := mock_dockerapi.NewMockDockerClient(mockCtrl)
t1 := &apitask.Task{
Arn: "t1",
Volumes: []apitask.TaskVolume{
{
Name: "1",
Type: apiresource.EBSTaskAttach,
Volume: &taskresourcevolume.EBSTaskVolumeConfig{
VolumeId: "vol-12345",
VolumeName: "test-volume",
VolumeSizeGib: "10",
SourceVolumeHostPath: "taskarn_vol-12345",
DeviceName: "/dev/nvme1n1",
FileSystem: "ext4",
tcs := []struct {
name string
setCSIClientExpectation func(*mock_csiclient.MockCSIClient)
expectedMetrics []*ecstcs.VolumeMetric
numMetrics int
}{
{
name: "Success",
setCSIClientExpectation: func(csi *mock_csiclient.MockCSIClient) {
csi.EXPECT().GetVolumeMetrics(gomock.Any(), "vol-12345", "/mnt/ecs/ebs/taskarn_vol-12345").Return(&csiclient.Metrics{
Used: 15 * 1024 * 1024 * 1024,
Capacity: 20 * 1024 * 1024 * 1024,
}, nil).Times(1)
},
expectedMetrics: []*ecstcs.VolumeMetric{
{
VolumeId: aws.String("vol-12345"),
VolumeName: aws.String("test-volume"),
Utilized: &ecstcs.UDoubleCWStatsSet{
Max: aws.Float64(15 * 1024 * 1024 * 1024),
Min: aws.Float64(15 * 1024 * 1024 * 1024),
SampleCount: aws.Int64(1),
Sum: aws.Float64(15 * 1024 * 1024 * 1024),
},
Size: &ecstcs.UDoubleCWStatsSet{
Max: aws.Float64(20 * 1024 * 1024 * 1024),
Min: aws.Float64(20 * 1024 * 1024 * 1024),
SampleCount: aws.Int64(1),
Sum: aws.Float64(20 * 1024 * 1024 * 1024),
},
},
},
numMetrics: 1,
},
}

resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil)
mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestFetchEBSVolumeMetrics"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
engine.ctx = ctx
engine.resolver = resolver
engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance
engine.client = mockDockerClient
engine.csiClient = csiclient.NewDummyCSIClient()

expectedUsedBytes := aws.Float64(15 * 1024 * 1024 * 1024)
expectedTotalBytes := aws.Float64(20 * 1024 * 1024 * 1024)
expectedMetrics := []*ecstcs.VolumeMetric{
{
VolumeId: aws.String("vol-12345"),
VolumeName: aws.String("test-volume"),
Utilized: &ecstcs.UDoubleCWStatsSet{
Max: expectedUsedBytes,
Min: expectedUsedBytes,
SampleCount: aws.Int64(1),
Sum: expectedUsedBytes,
name: "Failure",
setCSIClientExpectation: func(csi *mock_csiclient.MockCSIClient) {
csi.EXPECT().GetVolumeMetrics(gomock.Any(), "vol-12345", "/mnt/ecs/ebs/taskarn_vol-12345").Return(nil, errors.New("err")).Times(1)
},
Size: &ecstcs.UDoubleCWStatsSet{
Max: expectedTotalBytes,
Min: expectedTotalBytes,
SampleCount: aws.Int64(1),
Sum: expectedTotalBytes,
expectedMetrics: nil,
numMetrics: 0,
},
{
name: "TimeoutFailure",
setCSIClientExpectation: func(csi *mock_csiclient.MockCSIClient) {
csi.EXPECT().GetVolumeMetrics(gomock.Any(), "vol-12345", "/mnt/ecs/ebs/taskarn_vol-12345").Return(nil, errors.New("rpc error: code = DeadlineExceeded desc = context deadline exceeded")).Times(1)
},
expectedMetrics: nil,
numMetrics: 0,
},
}

actualMetrics := engine.fetchEBSVolumeMetrics(t1, "t1")
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
resolver := mock_resolver.NewMockContainerMetadataResolver(mockCtrl)
mockDockerClient := mock_dockerapi.NewMockDockerClient(mockCtrl)
t1 := &apitask.Task{
Arn: "t1",
Volumes: []apitask.TaskVolume{
{
Name: "1",
Type: apiresource.EBSTaskAttach,
Volume: &taskresourcevolume.EBSTaskVolumeConfig{
VolumeId: "vol-12345",
VolumeName: "test-volume",
VolumeSizeGib: "10",
SourceVolumeHostPath: "taskarn_vol-12345",
DeviceName: "/dev/nvme1n1",
FileSystem: "ext4",
},
},
},
}
engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestFetchEBSVolumeMetrics"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
engine.ctx = ctx
engine.resolver = resolver
engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance
engine.client = mockDockerClient

mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl)
tc.setCSIClientExpectation(mockCsiClient)
engine.csiClient = mockCsiClient

actualMetrics := engine.fetchEBSVolumeMetrics(t1, "t1")

assert.Len(t, actualMetrics, tc.numMetrics)
assert.Equal(t, actualMetrics, tc.expectedMetrics)
})
}

assert.Len(t, actualMetrics, 1)
assert.Equal(t, actualMetrics, expectedMetrics)
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ func (cfg *FSxWindowsFileServerVolumeConfig) Source() string {
return "undefined"
}

func (cfg *FSxWindowsFileServerVolumeConfig) GetType() string {
return "undefined"
}

// Currently not meant for use
func (cfg *FSxWindowsFileServerVolumeConfig) GetVolumeId() string {
return ""
}

// Currently not meant for use
func (cfg *FSxWindowsFileServerVolumeConfig) GetVolumeName() string {
return ""
}

// GetName safely returns the name of the resource
func (fv *FSxWindowsFileServerResource) GetName() string {
return "undefined"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
const (
psCredentialCommandFormat = "$(New-Object System.Management.Automation.PSCredential('%s', $(ConvertTo-SecureString '%s' -AsPlainText -Force)))"
resourceProvisioningError = "VolumeError: Agent could not create task's volume resources"
fsxVolumeType = "fsx"
)

// FSxWindowsFileServerResource represents a fsxwindowsfileserver resource
Expand Down Expand Up @@ -293,6 +294,20 @@ func (cfg *FSxWindowsFileServerVolumeConfig) Source() string {
return utils.GetCanonicalPath(cfg.HostPath)
}

func (cfg *FSxWindowsFileServerVolumeConfig) GetType() string {
return fsxVolumeType
}

func (cfg *FSxWindowsFileServerVolumeConfig) GetVolumeId() string {
return cfg.FileSystemID
}

// Note: The name is within the FSxWindowsFileServerResource struct. In order to use this in the future, this needs to be modified.
// Currently not meant for use
func (cfg *FSxWindowsFileServerVolumeConfig) GetVolumeName() string {
return ""
}

// GetName safely returns the name of the fsxwindowsfileserver resource
func (fv *FSxWindowsFileServerResource) GetName() string {
fv.lock.RLock()
Expand Down
Loading

0 comments on commit c15a7a8

Please sign in to comment.