diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 25f34144..18a41c7f 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -227,7 +227,7 @@ func processAttach(poolRegistry registry.PoolRegistry, volumeRegistry registry.V handleError(volumeRegistry, volume, err) return } - err = plugin.Mounter().Mount(volume, bricks) // TODO pass down specific attachments? + err = plugin.Mounter().Mount(volume, bricks, attachments) // TODO pass down specific attachments? if err != nil { handleError(volumeRegistry, volume, err) return @@ -260,7 +260,7 @@ func processDetach(poolRegistry registry.PoolRegistry, volumeRegistry registry.V return } - err = plugin.Mounter().Unmount(volume, bricks) // TODO pass down specific attachments? + err = plugin.Mounter().Unmount(volume, bricks, attachments) // TODO pass down specific attachments? if err != nil { // TODO: update specific attachment into an error state? handleError(volumeRegistry, volume, err) diff --git a/internal/pkg/mocks/pfsprovider_mock.go b/internal/pkg/mocks/pfsprovider_mock.go index ab367b48..f3d9019a 100644 --- a/internal/pkg/mocks/pfsprovider_mock.go +++ b/internal/pkg/mocks/pfsprovider_mock.go @@ -153,25 +153,25 @@ func (m *MockMounter) EXPECT() *MockMounterMockRecorder { } // Mount mocks base method -func (m *MockMounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { - ret := m.ctrl.Call(m, "Mount", volume, brickAllocations) +func (m *MockMounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { + ret := m.ctrl.Call(m, "Mount", volume, brickAllocations, attachments) ret0, _ := ret[0].(error) return ret0 } // Mount indicates an expected call of Mount -func (mr *MockMounterMockRecorder) Mount(volume, brickAllocations interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Mount", reflect.TypeOf((*MockMounter)(nil).Mount), volume, brickAllocations) +func (mr *MockMounterMockRecorder) Mount(volume, brickAllocations, attachments interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Mount", reflect.TypeOf((*MockMounter)(nil).Mount), volume, brickAllocations, attachments) } // Unmount mocks base method -func (m *MockMounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { - ret := m.ctrl.Call(m, "Unmount", volume, brickAllocations) +func (m *MockMounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { + ret := m.ctrl.Call(m, "Unmount", volume, brickAllocations, attachments) ret0, _ := ret[0].(error) return ret0 } // Unmount indicates an expected call of Unmount -func (mr *MockMounterMockRecorder) Unmount(volume, brickAllocations interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unmount", reflect.TypeOf((*MockMounter)(nil).Unmount), volume, brickAllocations) +func (mr *MockMounterMockRecorder) Unmount(volume, brickAllocations, attachments interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unmount", reflect.TypeOf((*MockMounter)(nil).Unmount), volume, brickAllocations, attachments) } diff --git a/internal/pkg/pfsprovider/ansible/mount.go b/internal/pkg/pfsprovider/ansible/mount.go index 63510a9d..93473198 100644 --- a/internal/pkg/pfsprovider/ansible/mount.go +++ b/internal/pkg/pfsprovider/ansible/mount.go @@ -30,7 +30,7 @@ func getMdtSize() string { return mdtSize } -func mount(fsType FSType, volume registry.Volume, brickAllocations []registry.BrickAllocation) error { +func mount(fsType FSType, volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { log.Println("Mount for:", volume.Name) var primaryBrickHost string for _, allocation := range brickAllocations { @@ -52,7 +52,7 @@ func mount(fsType FSType, volume registry.Volume, brickAllocations []registry.Br executeAnsibleMount(fsType, volume, brickAllocations) } - for _, attachment := range volume.Attachments { + for _, attachment := range attachments { if attachment.State != registry.RequestAttach { log.Printf("Skipping volume %s attach: %+v", volume.Name, attachment) continue @@ -118,10 +118,10 @@ func mount(fsType FSType, volume registry.Volume, brickAllocations []registry.Br return nil } -func umount(fsType FSType, volume registry.Volume, brickAllocations []registry.BrickAllocation) error { +func umount(fsType FSType, volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { log.Println("Umount for:", volume.Name) - for _, attachment := range volume.Attachments { + for _, attachment := range attachments { if attachment.State != registry.RequestDetach { log.Printf("Skipping volume %s detach for: %+v", volume.Name, attachment) continue @@ -133,10 +133,10 @@ func umount(fsType FSType, volume registry.Volume, brickAllocations []registry.B swapFile := path.Join(mountDir, fmt.Sprintf("/swap/%s", attachment.Hostname)) // TODO share? loopback := fmt.Sprintf("/dev/loop%d", volume.ClientPort) // TODO share? if err := swapOff(attachment.Hostname, loopback); err != nil { - return err + log.Printf("Warn: failed to swap off %+v", attachment) } if err := detachLoopback(attachment.Hostname, loopback); err != nil { - return err + log.Printf("Warn: failed to detach loopback %+v", attachment) } if err := removeSubtree(attachment.Hostname, swapFile); err != nil { return err diff --git a/internal/pkg/pfsprovider/ansible/mount_test.go b/internal/pkg/pfsprovider/ansible/mount_test.go index 0c50a467..363397c3 100644 --- a/internal/pkg/pfsprovider/ansible/mount_test.go +++ b/internal/pkg/pfsprovider/ansible/mount_test.go @@ -97,33 +97,34 @@ func Test_Mount(t *testing.T) { defer func() { runner = &run{} }() fake := &fakeRunner{} runner = fake + attachments := []registry.Attachment{ + {Hostname: "client1", Job: "job1", State: registry.RequestAttach}, + {Hostname: "client2", Job: "job1", State: registry.RequestAttach}, + {Hostname: "client3", Job: "job3", State: registry.Attached}, + {Hostname: "client3", Job: "job3", State: registry.RequestDetach}, + {Hostname: "client3", Job: "job3", State: registry.Detached}, + {Hostname: "client2", Job: "job2", State: registry.RequestAttach}, + } volume := registry.Volume{ Name: "asdf", JobName: "asdf", AttachGlobalNamespace: true, AttachPrivateNamespace: true, AttachAsSwapBytes: 1024 * 1024, // 1 MiB - Attachments: []registry.Attachment{ - {Hostname: "client1", Job: "job1", State: registry.RequestAttach}, - {Hostname: "client2", Job: "job1", State: registry.RequestAttach}, - {Hostname: "client3", Job: "job3", State: registry.Attached}, - {Hostname: "client3", Job: "job3", State: registry.RequestDetach}, - {Hostname: "client3", Job: "job3", State: registry.Detached}, - {Hostname: "client2", Job: "job2", State: registry.RequestAttach}, - }, - ClientPort: 42, - Owner: 1001, - Group: 1001, + Attachments: attachments, + ClientPort: 42, + Owner: 1001, + Group: 1001, } assert.PanicsWithValue(t, "failed to find primary brick for volume: asdf", - func() { mount(Lustre, volume, nil) }) + func() { mount(Lustre, volume, nil, nil) }) bricks := []registry.BrickAllocation{ {Hostname: "host1"}, {Hostname: "host2"}, } - err := mount(Lustre, volume, bricks) + err := mount(Lustre, volume, bricks, attachments) assert.Nil(t, err) assert.Equal(t, 53, fake.calls) @@ -162,28 +163,29 @@ func Test_Umount(t *testing.T) { defer func() { runner = &run{} }() fake := &fakeRunner{} runner = fake + attachments := []registry.Attachment{ + {Hostname: "client1", Job: "job4", State: registry.RequestDetach}, + {Hostname: "client2", Job: "job4", State: registry.RequestDetach}, + {Hostname: "client3", Job: "job3", State: registry.Attached}, + {Hostname: "client3", Job: "job3", State: registry.RequestAttach}, + {Hostname: "client3", Job: "job3", State: registry.Detached}, + {Hostname: "client2", Job: "job1", State: registry.RequestDetach}, + } volume := registry.Volume{ Name: "asdf", JobName: "asdf", AttachGlobalNamespace: true, AttachPrivateNamespace: true, AttachAsSwapBytes: 10000, - Attachments: []registry.Attachment{ - {Hostname: "client1", Job: "job4", State: registry.RequestDetach}, - {Hostname: "client2", Job: "job4", State: registry.RequestDetach}, - {Hostname: "client3", Job: "job3", State: registry.Attached}, - {Hostname: "client3", Job: "job3", State: registry.RequestAttach}, - {Hostname: "client3", Job: "job3", State: registry.Detached}, - {Hostname: "client2", Job: "job1", State: registry.RequestDetach}, - }, - ClientPort: 42, - Owner: 1001, - Group: 1001, + Attachments: attachments, + ClientPort: 42, + Owner: 1001, + Group: 1001, } bricks := []registry.BrickAllocation{ {Hostname: "host1"}, {Hostname: "host2"}, } - err := umount(Lustre, volume, bricks) + err := umount(Lustre, volume, bricks, attachments) assert.Nil(t, err) assert.Equal(t, 20, fake.calls) @@ -207,24 +209,25 @@ func Test_Umount_multi(t *testing.T) { defer func() { runner = &run{} }() fake := &fakeRunner{} runner = fake + attachments := []registry.Attachment{ + {Hostname: "client1", Job: "job1", State: registry.RequestDetach}, + } volume := registry.Volume{ MultiJob: true, Name: "asdf", JobName: "asdf", AttachGlobalNamespace: true, AttachPrivateNamespace: true, AttachAsSwapBytes: 10000, - Attachments: []registry.Attachment{ - {Hostname: "client1", Job: "job1", State: registry.RequestDetach}, - }, - ClientPort: 42, - Owner: 1001, - Group: 1001, + Attachments: attachments, + ClientPort: 42, + Owner: 1001, + Group: 1001, } bricks := []registry.BrickAllocation{ {Hostname: "host1"}, {Hostname: "host2"}, } - err := umount(Lustre, volume, bricks) + err := umount(Lustre, volume, bricks, attachments) assert.Nil(t, err) assert.Equal(t, 3, fake.calls) @@ -238,25 +241,26 @@ func Test_Mount_multi(t *testing.T) { defer func() { runner = &run{} }() fake := &fakeRunner{} runner = fake + attachments := []registry.Attachment{ + {Hostname: "client1", Job: "job1", State: registry.RequestAttach}, + } volume := registry.Volume{ MultiJob: true, Name: "asdf", JobName: "asdf", AttachGlobalNamespace: true, AttachPrivateNamespace: true, AttachAsSwapBytes: 10000, - Attachments: []registry.Attachment{ - {Hostname: "client1", Job: "job1", State: registry.RequestAttach}, - }, - ClientPort: 42, - Owner: 1001, - Group: 1001, - UUID: "medkDfdg", + Attachments: attachments, + ClientPort: 42, + Owner: 1001, + Group: 1001, + UUID: "medkDfdg", } bricks := []registry.BrickAllocation{ {Hostname: "host1"}, {Hostname: "host2"}, } - err := mount(Lustre, volume, bricks) + err := mount(Lustre, volume, bricks, attachments) assert.Nil(t, err) assert.Equal(t, 5, fake.calls) diff --git a/internal/pkg/pfsprovider/ansible/plugin.go b/internal/pkg/pfsprovider/ansible/plugin.go index cdc9e4e4..a55f0678 100644 --- a/internal/pkg/pfsprovider/ansible/plugin.go +++ b/internal/pkg/pfsprovider/ansible/plugin.go @@ -87,10 +87,10 @@ type mounter struct { FSType FSType } -func (mounter *mounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { - return mount(mounter.FSType, volume, brickAllocations) +func (mounter *mounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { + return mount(mounter.FSType, volume, brickAllocations, attachments) } -func (mounter *mounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { - return umount(mounter.FSType, volume, brickAllocations) +func (mounter *mounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { + return umount(mounter.FSType, volume, brickAllocations, attachments) } diff --git a/internal/pkg/pfsprovider/fake/plugin.go b/internal/pkg/pfsprovider/fake/plugin.go index 8e9f1151..1583c43c 100644 --- a/internal/pkg/pfsprovider/fake/plugin.go +++ b/internal/pkg/pfsprovider/fake/plugin.go @@ -44,12 +44,12 @@ func (*volumeProvider) CopyDataOut(volume registry.Volume) error { type mounter struct{} -func (*mounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { +func (*mounter) Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { log.Println("Mount for:", volume.Name) return nil } -func (*mounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error { +func (*mounter) Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error { log.Println("Umount for:", volume.Name) return nil } diff --git a/internal/pkg/pfsprovider/interface.go b/internal/pkg/pfsprovider/interface.go index e1f5f7a5..4bbc85d0 100644 --- a/internal/pkg/pfsprovider/interface.go +++ b/internal/pkg/pfsprovider/interface.go @@ -22,6 +22,6 @@ type VolumeProvider interface { // Actions that are sent to remote hosts, // typically compute nodes and primary brick hosts type Mounter interface { - Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error - Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation) error + Mount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error + Unmount(volume registry.Volume, brickAllocations []registry.BrickAllocation, attachments []registry.Attachment) error }