From 4d7c49a8d655eb2e7d1430f6b4da8eb75c4a85f6 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Fri, 14 Dec 2018 12:38:44 +0000 Subject: [PATCH 01/22] Add some ideas for better watch primitives --- cmd/dacctl/main_test.go | 6 ++ internal/pkg/etcdregistry/keystore.go | 109 ++++++++++++++++++++-- internal/pkg/keystoreregistry/keystore.go | 9 ++ internal/pkg/mocks/keystore_mock.go | 25 +++++ 4 files changed, 143 insertions(+), 6 deletions(-) diff --git a/cmd/dacctl/main_test.go b/cmd/dacctl/main_test.go index 7594554f..8a5b67eb 100644 --- a/cmd/dacctl/main_test.go +++ b/cmd/dacctl/main_test.go @@ -175,6 +175,12 @@ func (*stubKeystore) KeepAliveKey(key string) error { func (*stubKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) { panic("implement me") } +func (*stubKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { + panic("implement me") +} +func (*stubKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) { + panic("implement me") +} type stubDacctlActions struct{} diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index ef7ce156..f81c1495 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -65,11 +65,11 @@ func newEtcdClient() *clientv3.Client { func NewKeystore() keystoreregistry.Keystore { cli := newEtcdClient() - return &etcKeystore{cli} + return &etcKeystore{Client: cli} } type etcKeystore struct { - *clientv3.Client + Client *clientv3.Client } func (client *etcKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) { @@ -97,6 +97,10 @@ func handleError(err error) { } } +func (client *etcKeystore) Close() error { + return client.Client.Close() +} + func (client *etcKeystore) runTransaction(ifOps []clientv3.Cmp, thenOps []clientv3.Op) error { kvc := clientv3.NewKV(client.Client) kvc.Txn(context.Background()) @@ -195,7 +199,7 @@ func (client *etcKeystore) Get(key string) (keystoreregistry.KeyValueVersion, er func (client *etcKeystore) WatchPrefix(prefix string, onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) { - rch := client.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) + rch := client.Client.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) go func() { for wresp := range rch { for _, ev := range wresp.Events { @@ -213,7 +217,7 @@ func (client *etcKeystore) WatchPrefix(prefix string, func (client *etcKeystore) WatchKey(ctxt context.Context, key string, onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) { - rch := client.Watch(ctxt, key, clientv3.WithPrevKV()) + rch := client.Client.Watch(ctxt, key, clientv3.WithPrevKV()) go func() { for watchResponse := range rch { for _, ev := range watchResponse.Events { @@ -232,6 +236,99 @@ func (client *etcKeystore) WatchKey(ctxt context.Context, key string, }() } +func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { + + // open channel with etcd + options := []clientv3.OpOption{clientv3.WithPrevKV()} + if withPrefix { + options = append(options, clientv3.WithPrefix()) + } + rch := client.Client.Watch(ctxt, key, options...) + + c := make(chan keystoreregistry.KeyValueUpdate) + + go func() { + for watchResponse := range rch { + for _, ev := range watchResponse.Events { + update := keystoreregistry.KeyValueUpdate{ + New: getKeyValueVersion(ev.Kv), + Old: getKeyValueVersion(ev.PrevKv), + } + // show deleted by returning nil for new + if update.New != nil && update.New.CreateRevision == 0 { + update.New = nil + // TODO: should we cancel if the key is deleted? + // or just let the user deal with it? + } + c <- update + } + } + // Assuming we get here when the context is cancelled or hits its timeout + // i.e. there are no more events, so we close the channel + close(c) + }() + + return c +} + +func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, + check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) { + + // check key is present and find revision of the last update + initialValue, err := client.Get(key) + if err != nil { + return false, err + } + if fromRevision < initialValue.CreateRevision { + return false, errors.New("incorrect fromRevision") + } + + // no deadline set, so add default timeout of 10 mins + var cancelFunc context.CancelFunc + _, ok := ctxt.Deadline() + if !ok { + ctxt, cancelFunc = context.WithTimeout(ctxt, time.Minute*10) + } + + // open channel with etcd, starting with the last revision of the key from above + rch := client.Client.Watch(ctxt, key, clientv3.WithPrefix(), clientv3.WithRev(fromRevision)) + if rch == nil { + cancelFunc() + return false, errors.New("no watcher returned from etcd") + } + + conditionMet := false + go func() { + for watchResponse := range rch { + for _, ev := range watchResponse.Events { + update := keystoreregistry.KeyValueUpdate{ + New: getKeyValueVersion(ev.Kv), + Old: getKeyValueVersion(ev.PrevKv), + } + + // show deleted by returning nil for new + isKeyDeleted := false + if ev.Type == clientv3.EventTypeDelete { + update.New = nil + isKeyDeleted = true + } + + conditionMet := check(update) + + // stop watching if the condition passed or key was deleted + if conditionMet || isKeyDeleted { + cancelFunc() + return + } + } + } + // Assuming we get here when the context is cancelled or hits its timeout + // i.e. there are no more events, so we close the channel + }() + + return conditionMet, nil +} + func (client *etcKeystore) KeepAliveKey(key string) error { kvc := clientv3.NewKV(client.Client) @@ -243,7 +340,7 @@ func (client *etcKeystore) KeepAliveKey(key string) error { // TODO what about configure timeout and ttl? var ttl int64 = 10 - grantResponse, err := client.Grant(context.Background(), ttl) + grantResponse, err := client.Client.Grant(context.Background(), ttl) if err != nil { log.Fatal(err) } @@ -258,7 +355,7 @@ func (client *etcKeystore) KeepAliveKey(key string) error { return fmt.Errorf("unable to create keep-alive key: %s", key) } - ch, err := client.KeepAlive(context.Background(), leaseID) + ch, err := client.Client.KeepAlive(context.Background(), leaseID) if err != nil { log.Fatal(err) } diff --git a/internal/pkg/keystoreregistry/keystore.go b/internal/pkg/keystoreregistry/keystore.go index f739f4d2..b27b76b6 100644 --- a/internal/pkg/keystoreregistry/keystore.go +++ b/internal/pkg/keystoreregistry/keystore.go @@ -57,6 +57,10 @@ type Keystore interface { // When callback returns true, stop watch the key WatchKey(ctxt context.Context, key string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion)) + // TODO: WIP to replce above watch functions + Watch(ctxt context.Context, key string, withPrefix bool) <-chan KeyValueUpdate + WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update KeyValueUpdate) bool) (bool, error) + // Add a key, and remove it when calling process dies // Error is returned if the key already exists KeepAliveKey(key string) error @@ -77,6 +81,11 @@ type KeyValueVersion struct { ModRevision int64 } +type KeyValueUpdate struct { + Old *KeyValueVersion + New *KeyValueVersion +} + func (kvv KeyValueVersion) String() string { return toJson(kvv) } diff --git a/internal/pkg/mocks/keystore_mock.go b/internal/pkg/mocks/keystore_mock.go index 7cd5773e..1258fb62 100644 --- a/internal/pkg/mocks/keystore_mock.go +++ b/internal/pkg/mocks/keystore_mock.go @@ -140,6 +140,31 @@ func (mr *MockKeystoreMockRecorder) WatchKey(ctxt, key, onUpdate interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchKey", reflect.TypeOf((*MockKeystore)(nil).WatchKey), ctxt, key, onUpdate) } +// Watch mocks base method +func (m *MockKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { + ret := m.ctrl.Call(m, "Watch", ctxt, key, withPrefix) + ret0, _ := ret[0].(<-chan keystoreregistry.KeyValueUpdate) + return ret0 +} + +// Watch indicates an expected call of Watch +func (mr *MockKeystoreMockRecorder) Watch(ctxt, key, withPrefix interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockKeystore)(nil).Watch), ctxt, key, withPrefix) +} + +// WatchForCondition mocks base method +func (m *MockKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(keystoreregistry.KeyValueUpdate) bool) (bool, error) { + ret := m.ctrl.Call(m, "WatchForCondition", ctxt, key, fromRevision, check) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WatchForCondition indicates an expected call of WatchForCondition +func (mr *MockKeystoreMockRecorder) WatchForCondition(ctxt, key, fromRevision, check interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchForCondition", reflect.TypeOf((*MockKeystore)(nil).WatchForCondition), ctxt, key, fromRevision, check) +} + // KeepAliveKey mocks base method func (m *MockKeystore) KeepAliveKey(key string) error { ret := m.ctrl.Call(m, "KeepAliveKey", key) From 114d2af0f04f260234ac157421464c0c99f9cdb1 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 09:57:10 +0000 Subject: [PATCH 02/22] Tidy up watch helper in etcdregistry Add clear errors, create, modify and delete signals. Keep the same clear end signal by closing the channel at the right moment. --- internal/pkg/etcdregistry/keystore.go | 26 +++++++++++++++-------- internal/pkg/keystoreregistry/keystore.go | 8 +++++-- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index f81c1495..f1f652c6 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -237,8 +237,6 @@ func (client *etcKeystore) WatchKey(ctxt context.Context, key string, } func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { - - // open channel with etcd options := []clientv3.OpOption{clientv3.WithPrevKV()} if withPrefix { options = append(options, clientv3.WithPrefix()) @@ -249,20 +247,30 @@ func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bo go func() { for watchResponse := range rch { + // if error, send empty update with an error + err := watchResponse.Err() + if err != nil { + c <- keystoreregistry.KeyValueUpdate{Err: err} + } + + // send all events in this watch response for _, ev := range watchResponse.Events { update := keystoreregistry.KeyValueUpdate{ - New: getKeyValueVersion(ev.Kv), - Old: getKeyValueVersion(ev.PrevKv), + IsCreate: ev.IsCreate(), + IsModify: ev.IsModify(), + IsDelete: ev.Type == clientv3.EventTypeDelete, } - // show deleted by returning nil for new - if update.New != nil && update.New.CreateRevision == 0 { - update.New = nil - // TODO: should we cancel if the key is deleted? - // or just let the user deal with it? + if update.IsCreate || update.IsModify { + update.New = getKeyValueVersion(ev.Kv) + } + if update.IsDelete || update.IsModify { + update.Old = getKeyValueVersion(ev.PrevKv) } + c <- update } } + // Assuming we get here when the context is cancelled or hits its timeout // i.e. there are no more events, so we close the channel close(c) diff --git a/internal/pkg/keystoreregistry/keystore.go b/internal/pkg/keystoreregistry/keystore.go index b27b76b6..246cbffc 100644 --- a/internal/pkg/keystoreregistry/keystore.go +++ b/internal/pkg/keystoreregistry/keystore.go @@ -82,8 +82,12 @@ type KeyValueVersion struct { } type KeyValueUpdate struct { - Old *KeyValueVersion - New *KeyValueVersion + Old *KeyValueVersion + New *KeyValueVersion + IsCreate bool + IsModify bool + IsDelete bool + Err error } func (kvv KeyValueVersion) String() string { From c5d32634c7afbfdab83a9b85a5afac126573a96f Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 10:10:39 +0000 Subject: [PATCH 03/22] Clean up interface to watch method --- cmd/dacctl/main_test.go | 5 +- internal/pkg/etcdregistry/keystore.go | 60 ++++++++++++----------- internal/pkg/keystoreregistry/keystore.go | 11 ++++- internal/pkg/mocks/keystore_mock.go | 17 +------ 4 files changed, 44 insertions(+), 49 deletions(-) diff --git a/cmd/dacctl/main_test.go b/cmd/dacctl/main_test.go index 8a5b67eb..0c56340d 100644 --- a/cmd/dacctl/main_test.go +++ b/cmd/dacctl/main_test.go @@ -175,10 +175,7 @@ func (*stubKeystore) KeepAliveKey(key string) error { func (*stubKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) { panic("implement me") } -func (*stubKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { - panic("implement me") -} -func (*stubKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) { +func (*stubKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan { panic("implement me") } diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index f1f652c6..74117b12 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -236,7 +236,7 @@ func (client *etcKeystore) WatchKey(ctxt context.Context, key string, }() } -func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { +func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan { options := []clientv3.OpOption{clientv3.WithPrevKV()} if withPrefix { options = append(options, clientv3.WithPrefix()) @@ -245,40 +245,43 @@ func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bo c := make(chan keystoreregistry.KeyValueUpdate) - go func() { - for watchResponse := range rch { - // if error, send empty update with an error - err := watchResponse.Err() - if err != nil { - c <- keystoreregistry.KeyValueUpdate{Err: err} - } + go processWatchEvents(rch, c) - // send all events in this watch response - for _, ev := range watchResponse.Events { - update := keystoreregistry.KeyValueUpdate{ - IsCreate: ev.IsCreate(), - IsModify: ev.IsModify(), - IsDelete: ev.Type == clientv3.EventTypeDelete, - } - if update.IsCreate || update.IsModify { - update.New = getKeyValueVersion(ev.Kv) - } - if update.IsDelete || update.IsModify { - update.Old = getKeyValueVersion(ev.PrevKv) - } + return c +} - c <- update - } +func processWatchEvents(watchChan clientv3.WatchChan, c chan keystoreregistry.KeyValueUpdate) { + for watchResponse := range watchChan { + // if error, send empty update with an error + err := watchResponse.Err() + if err != nil { + c <- keystoreregistry.KeyValueUpdate{Err: err} } - // Assuming we get here when the context is cancelled or hits its timeout - // i.e. there are no more events, so we close the channel - close(c) - }() + // send all events in this watch response + for _, ev := range watchResponse.Events { + update := keystoreregistry.KeyValueUpdate{ + IsCreate: ev.IsCreate(), + IsModify: ev.IsModify(), + IsDelete: ev.Type == clientv3.EventTypeDelete, + } + if update.IsCreate || update.IsModify { + update.New = getKeyValueVersion(ev.Kv) + } + if update.IsDelete || update.IsModify { + update.Old = getKeyValueVersion(ev.PrevKv) + } - return c + c <- update + } + } + + // Assuming we get here when the context is cancelled or hits its timeout + // i.e. there are no more events, so we close the channel + close(c) } +// TODO: this needs fixing up func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) { @@ -308,6 +311,7 @@ func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, f conditionMet := false go func() { for watchResponse := range rch { + // TODO: this should instead use Watch from above! for _, ev := range watchResponse.Events { update := keystoreregistry.KeyValueUpdate{ New: getKeyValueVersion(ev.Kv), diff --git a/internal/pkg/keystoreregistry/keystore.go b/internal/pkg/keystoreregistry/keystore.go index 246cbffc..e747f076 100644 --- a/internal/pkg/keystoreregistry/keystore.go +++ b/internal/pkg/keystoreregistry/keystore.go @@ -57,9 +57,14 @@ type Keystore interface { // When callback returns true, stop watch the key WatchKey(ctxt context.Context, key string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion)) + // Get a channel containing all KeyValueUpdate events + // + // Use the context to control if you watch forever, or if you choose to cancel when a key + // is deleted, or you stop watching after some timeout. + Watch(ctxt context.Context, key string, withPrefix bool) KeyValueUpdateChan + // TODO: WIP to replce above watch functions - Watch(ctxt context.Context, key string, withPrefix bool) <-chan KeyValueUpdate - WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update KeyValueUpdate) bool) (bool, error) + // WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update KeyValueUpdate) bool) (bool, error) // Add a key, and remove it when calling process dies // Error is returned if the key already exists @@ -69,6 +74,8 @@ type Keystore interface { NewMutex(lockKey string) (Mutex, error) } +type KeyValueUpdateChan <-chan KeyValueUpdate + type KeyValue struct { Key string Value string // TODO: should this be []byte? Or have a json parsed version? diff --git a/internal/pkg/mocks/keystore_mock.go b/internal/pkg/mocks/keystore_mock.go index 1258fb62..ec549cc2 100644 --- a/internal/pkg/mocks/keystore_mock.go +++ b/internal/pkg/mocks/keystore_mock.go @@ -141,9 +141,9 @@ func (mr *MockKeystoreMockRecorder) WatchKey(ctxt, key, onUpdate interface{}) *g } // Watch mocks base method -func (m *MockKeystore) Watch(ctxt context.Context, key string, withPrefix bool) <-chan keystoreregistry.KeyValueUpdate { +func (m *MockKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan { ret := m.ctrl.Call(m, "Watch", ctxt, key, withPrefix) - ret0, _ := ret[0].(<-chan keystoreregistry.KeyValueUpdate) + ret0, _ := ret[0].(keystoreregistry.KeyValueUpdateChan) return ret0 } @@ -152,19 +152,6 @@ func (mr *MockKeystoreMockRecorder) Watch(ctxt, key, withPrefix interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockKeystore)(nil).Watch), ctxt, key, withPrefix) } -// WatchForCondition mocks base method -func (m *MockKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(keystoreregistry.KeyValueUpdate) bool) (bool, error) { - ret := m.ctrl.Call(m, "WatchForCondition", ctxt, key, fromRevision, check) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WatchForCondition indicates an expected call of WatchForCondition -func (mr *MockKeystoreMockRecorder) WatchForCondition(ctxt, key, fromRevision, check interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchForCondition", reflect.TypeOf((*MockKeystore)(nil).WatchForCondition), ctxt, key, fromRevision, check) -} - // KeepAliveKey mocks base method func (m *MockKeystore) KeepAliveKey(key string) error { ret := m.ctrl.Call(m, "KeepAliveKey", key) From e0d2ac95c14992e6db8e23eaa3af1775839aee74 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 10:13:49 +0000 Subject: [PATCH 04/22] Extract watch method --- internal/pkg/etcdregistry/keystore.go | 45 ----------------------- internal/pkg/etcdregistry/watch.go | 52 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 45 deletions(-) create mode 100644 internal/pkg/etcdregistry/watch.go diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index 74117b12..8c0e05c6 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -236,51 +236,6 @@ func (client *etcKeystore) WatchKey(ctxt context.Context, key string, }() } -func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan { - options := []clientv3.OpOption{clientv3.WithPrevKV()} - if withPrefix { - options = append(options, clientv3.WithPrefix()) - } - rch := client.Client.Watch(ctxt, key, options...) - - c := make(chan keystoreregistry.KeyValueUpdate) - - go processWatchEvents(rch, c) - - return c -} - -func processWatchEvents(watchChan clientv3.WatchChan, c chan keystoreregistry.KeyValueUpdate) { - for watchResponse := range watchChan { - // if error, send empty update with an error - err := watchResponse.Err() - if err != nil { - c <- keystoreregistry.KeyValueUpdate{Err: err} - } - - // send all events in this watch response - for _, ev := range watchResponse.Events { - update := keystoreregistry.KeyValueUpdate{ - IsCreate: ev.IsCreate(), - IsModify: ev.IsModify(), - IsDelete: ev.Type == clientv3.EventTypeDelete, - } - if update.IsCreate || update.IsModify { - update.New = getKeyValueVersion(ev.Kv) - } - if update.IsDelete || update.IsModify { - update.Old = getKeyValueVersion(ev.PrevKv) - } - - c <- update - } - } - - // Assuming we get here when the context is cancelled or hits its timeout - // i.e. there are no more events, so we close the channel - close(c) -} - // TODO: this needs fixing up func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) { diff --git a/internal/pkg/etcdregistry/watch.go b/internal/pkg/etcdregistry/watch.go new file mode 100644 index 00000000..c0bceefc --- /dev/null +++ b/internal/pkg/etcdregistry/watch.go @@ -0,0 +1,52 @@ +package etcdregistry + +import ( + "context" + "github.com/RSE-Cambridge/data-acc/internal/pkg/keystoreregistry" + "github.com/coreos/etcd/clientv3" +) + +func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan { + options := []clientv3.OpOption{clientv3.WithPrevKV()} + if withPrefix { + options = append(options, clientv3.WithPrefix()) + } + rch := client.Client.Watch(ctxt, key, options...) + + c := make(chan keystoreregistry.KeyValueUpdate) + + go processWatchEvents(rch, c) + + return c +} + +func processWatchEvents(watchChan clientv3.WatchChan, c chan keystoreregistry.KeyValueUpdate) { + for watchResponse := range watchChan { + // if error, send empty update with an error + err := watchResponse.Err() + if err != nil { + c <- keystoreregistry.KeyValueUpdate{Err: err} + } + + // send all events in this watch response + for _, ev := range watchResponse.Events { + update := keystoreregistry.KeyValueUpdate{ + IsCreate: ev.IsCreate(), + IsModify: ev.IsModify(), + IsDelete: ev.Type == clientv3.EventTypeDelete, + } + if update.IsCreate || update.IsModify { + update.New = getKeyValueVersion(ev.Kv) + } + if update.IsDelete || update.IsModify { + update.Old = getKeyValueVersion(ev.PrevKv) + } + + c <- update + } + } + + // Assuming we get here when the context is cancelled or hits its timeout + // i.e. there are no more events, so we close the channel + close(c) +} From d78fc05098c6252fa717c86a4935f0c3bd122aef Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 10:31:51 +0000 Subject: [PATCH 05/22] Add interface based use of etcd client --- internal/pkg/etcdregistry/keystore.go | 10 +++++++++- internal/pkg/etcdregistry/watch.go | 2 +- internal/pkg/etcdregistry/watch_test.go | 7 +++++++ 3 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 internal/pkg/etcdregistry/watch_test.go diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index 8c0e05c6..9d37f591 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -65,10 +65,18 @@ func newEtcdClient() *clientv3.Client { func NewKeystore() keystoreregistry.Keystore { cli := newEtcdClient() - return &etcKeystore{Client: cli} + return &etcKeystore{ + Watcher: cli.Watcher, + KV: cli.KV, + Lease: cli.Lease, + Client: cli, + } } type etcKeystore struct { + Watcher clientv3.Watcher + KV clientv3.KV + Lease clientv3.Lease Client *clientv3.Client } diff --git a/internal/pkg/etcdregistry/watch.go b/internal/pkg/etcdregistry/watch.go index c0bceefc..78b4b0c9 100644 --- a/internal/pkg/etcdregistry/watch.go +++ b/internal/pkg/etcdregistry/watch.go @@ -11,7 +11,7 @@ func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bo if withPrefix { options = append(options, clientv3.WithPrefix()) } - rch := client.Client.Watch(ctxt, key, options...) + rch := client.Watcher.Watch(ctxt, key, options...) c := make(chan keystoreregistry.KeyValueUpdate) diff --git a/internal/pkg/etcdregistry/watch_test.go b/internal/pkg/etcdregistry/watch_test.go new file mode 100644 index 00000000..44f35de5 --- /dev/null +++ b/internal/pkg/etcdregistry/watch_test.go @@ -0,0 +1,7 @@ +package etcdregistry + +import "testing" + +func TestEtcKeystore_Watch(t *testing.T) { + +} From d97a7a4adc3fae96b2b2a8863470c28b12ee12cf Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 10:53:12 +0000 Subject: [PATCH 06/22] Add initial watcher unit test --- internal/pkg/etcdregistry/keystore.go | 12 ++++----- internal/pkg/etcdregistry/watch_test.go | 33 +++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/internal/pkg/etcdregistry/keystore.go b/internal/pkg/etcdregistry/keystore.go index 9d37f591..d647832d 100644 --- a/internal/pkg/etcdregistry/keystore.go +++ b/internal/pkg/etcdregistry/keystore.go @@ -67,17 +67,17 @@ func NewKeystore() keystoreregistry.Keystore { cli := newEtcdClient() return &etcKeystore{ Watcher: cli.Watcher, - KV: cli.KV, - Lease: cli.Lease, - Client: cli, + KV: cli.KV, + Lease: cli.Lease, + Client: cli, } } type etcKeystore struct { Watcher clientv3.Watcher - KV clientv3.KV - Lease clientv3.Lease - Client *clientv3.Client + KV clientv3.KV + Lease clientv3.Lease + Client *clientv3.Client } func (client *etcKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) { diff --git a/internal/pkg/etcdregistry/watch_test.go b/internal/pkg/etcdregistry/watch_test.go index 44f35de5..fed1b125 100644 --- a/internal/pkg/etcdregistry/watch_test.go +++ b/internal/pkg/etcdregistry/watch_test.go @@ -1,7 +1,36 @@ package etcdregistry -import "testing" +import ( + "context" + "github.com/coreos/etcd/clientv3" + "github.com/stretchr/testify/assert" + "testing" +) -func TestEtcKeystore_Watch(t *testing.T) { +type fakeWatcher struct { + t *testing.T + ch clientv3.WatchChan + opts []clientv3.OpOption +} + +func (fw fakeWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + assert.Equal(fw.t, "key1", key) + assert.EqualValues(fw.t, len(fw.opts), len(opts)) // TODO: how to assert this properly? + return fw.ch +} +func (fakeWatcher) Close() error { + panic("implement me") +} + +func TestEtcKeystore_Watch_Nil(t *testing.T) { + keystore := etcKeystore{ + Watcher: fakeWatcher{ + t: t, ch: nil, + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithPrevKV()}, + }, + } + + response := keystore.Watch(context.TODO(), "key1", true) + assert.Empty(t, response) } From 8afdee9a55cca5045d24a133910811abd0e48ba3 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 11:25:14 +0000 Subject: [PATCH 07/22] Check all watches processed --- internal/pkg/etcdregistry/watch_test.go | 67 +++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/internal/pkg/etcdregistry/watch_test.go b/internal/pkg/etcdregistry/watch_test.go index fed1b125..51629935 100644 --- a/internal/pkg/etcdregistry/watch_test.go +++ b/internal/pkg/etcdregistry/watch_test.go @@ -3,6 +3,7 @@ package etcdregistry import ( "context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/stretchr/testify/assert" "testing" ) @@ -14,7 +15,7 @@ type fakeWatcher struct { } func (fw fakeWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - assert.Equal(fw.t, "key1", key) + assert.Equal(fw.t, "key", key) assert.EqualValues(fw.t, len(fw.opts), len(opts)) // TODO: how to assert this properly? return fw.ch } @@ -26,11 +27,71 @@ func TestEtcKeystore_Watch_Nil(t *testing.T) { keystore := etcKeystore{ Watcher: fakeWatcher{ t: t, ch: nil, - opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithPrevKV()}, + opts: []clientv3.OpOption{clientv3.WithPrevKV()}, }, } - response := keystore.Watch(context.TODO(), "key1", true) + response := keystore.Watch(context.TODO(), "key", false) assert.Empty(t, response) } + +func TestEtcKeystore_Watch(t *testing.T) { + ch := make(chan clientv3.WatchResponse) + + keystore := etcKeystore{ + Watcher: fakeWatcher{ + t: t, ch: ch, + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithPrevKV()}, + }, + } + + go func(){ + ch <- clientv3.WatchResponse{ + Events: []*clientv3.Event{ + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key:[]byte("key1")}}, + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key:[]byte("key2")}}, + }} + ch <- clientv3.WatchResponse{ + Events: []*clientv3.Event{ + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{CreateRevision:1}}, + }} + ch <- clientv3.WatchResponse{ + Events: []*clientv3.Event{ + {Type: clientv3.EventTypeDelete, Kv: &mvccpb.KeyValue{}}, + {Type: clientv3.EventTypeDelete, Kv: &mvccpb.KeyValue{}}, + }} + close(ch) + }() + + response := keystore.Watch(context.TODO(), "key", true) + + ev1 := <- response + assert.True(t, ev1.IsCreate) + assert.False(t, ev1.IsModify) + assert.False(t, ev1.IsDelete) + + ev2 := <- response + assert.True(t, ev2.IsCreate) + assert.False(t, ev2.IsModify) + assert.False(t, ev2.IsDelete) + + ev3 := <- response + assert.False(t, ev3.IsCreate) + assert.True(t, ev3.IsModify) + assert.False(t, ev3.IsDelete) + + ev4 := <- response + assert.False(t, ev4.IsCreate) + assert.False(t, ev4.IsModify) + assert.True(t, ev4.IsDelete) + + ev5 := <- response + assert.False(t, ev5.IsCreate) + assert.False(t, ev5.IsModify) + assert.True(t, ev5.IsDelete) + + // Check chan is closed + _, ok := <- response + assert.False(t, ok) +} From dfff499448fb21dabe169beb5729e3a0581b4180 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 11:33:53 +0000 Subject: [PATCH 08/22] Ensure keys are correctly reported by watch --- internal/pkg/etcdregistry/watch_test.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/pkg/etcdregistry/watch_test.go b/internal/pkg/etcdregistry/watch_test.go index 51629935..430f711a 100644 --- a/internal/pkg/etcdregistry/watch_test.go +++ b/internal/pkg/etcdregistry/watch_test.go @@ -54,12 +54,16 @@ func TestEtcKeystore_Watch(t *testing.T) { }} ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ - {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{CreateRevision:1}}, + { + Type: clientv3.EventTypePut, + Kv: &mvccpb.KeyValue{ModRevision:1, Key:[]byte("key2")}, + PrevKv: &mvccpb.KeyValue{ModRevision:1, Key:[]byte("key2")}, + }, }} ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ - {Type: clientv3.EventTypeDelete, Kv: &mvccpb.KeyValue{}}, - {Type: clientv3.EventTypeDelete, Kv: &mvccpb.KeyValue{}}, + {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key:[]byte("key2")}}, + {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key:[]byte("key1")}}, }} close(ch) }() @@ -70,26 +74,36 @@ func TestEtcKeystore_Watch(t *testing.T) { assert.True(t, ev1.IsCreate) assert.False(t, ev1.IsModify) assert.False(t, ev1.IsDelete) + assert.Nil(t, ev1.Old) + assert.EqualValues(t, "key1", ev1.New.Key) ev2 := <- response assert.True(t, ev2.IsCreate) assert.False(t, ev2.IsModify) assert.False(t, ev2.IsDelete) + assert.Nil(t, ev2.Old) + assert.EqualValues(t, "key2", ev2.New.Key) ev3 := <- response assert.False(t, ev3.IsCreate) assert.True(t, ev3.IsModify) assert.False(t, ev3.IsDelete) + assert.EqualValues(t, "key2", ev3.New.Key) + assert.EqualValues(t, "key2", ev3.Old.Key) ev4 := <- response assert.False(t, ev4.IsCreate) assert.False(t, ev4.IsModify) assert.True(t, ev4.IsDelete) + assert.Nil(t, ev4.New) + assert.EqualValues(t, "key2", ev4.Old.Key) ev5 := <- response assert.False(t, ev5.IsCreate) assert.False(t, ev5.IsModify) assert.True(t, ev5.IsDelete) + assert.Nil(t, ev5.New) + assert.EqualValues(t, "key1", ev5.Old.Key) // Check chan is closed _, ok := <- response From f8d1cdfa736ef5cb348704159281bb8baeeb0657 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 11:42:46 +0000 Subject: [PATCH 09/22] Test error handing in watch --- internal/pkg/etcdregistry/watch_test.go | 40 +++++++++++++++---------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/internal/pkg/etcdregistry/watch_test.go b/internal/pkg/etcdregistry/watch_test.go index 430f711a..8dbddaa0 100644 --- a/internal/pkg/etcdregistry/watch_test.go +++ b/internal/pkg/etcdregistry/watch_test.go @@ -46,66 +46,74 @@ func TestEtcKeystore_Watch(t *testing.T) { }, } - go func(){ + go func() { ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ - {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key:[]byte("key1")}}, - {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key:[]byte("key2")}}, - }} + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("key1")}}, + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("key2")}}, + }} ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ { - Type: clientv3.EventTypePut, - Kv: &mvccpb.KeyValue{ModRevision:1, Key:[]byte("key2")}, - PrevKv: &mvccpb.KeyValue{ModRevision:1, Key:[]byte("key2")}, + Type: clientv3.EventTypePut, + Kv: &mvccpb.KeyValue{ModRevision: 1, Key: []byte("key2")}, + PrevKv: &mvccpb.KeyValue{ModRevision: 1, Key: []byte("key2")}, }, }} ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ - {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key:[]byte("key2")}}, - {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key:[]byte("key1")}}, + {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key: []byte("key2")}}, + {Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key: []byte("key1")}}, }} + ch <- clientv3.WatchResponse{Canceled: true} close(ch) }() response := keystore.Watch(context.TODO(), "key", true) - ev1 := <- response + ev1 := <-response assert.True(t, ev1.IsCreate) assert.False(t, ev1.IsModify) assert.False(t, ev1.IsDelete) assert.Nil(t, ev1.Old) assert.EqualValues(t, "key1", ev1.New.Key) - ev2 := <- response + ev2 := <-response assert.True(t, ev2.IsCreate) assert.False(t, ev2.IsModify) assert.False(t, ev2.IsDelete) assert.Nil(t, ev2.Old) assert.EqualValues(t, "key2", ev2.New.Key) - ev3 := <- response + ev3 := <-response assert.False(t, ev3.IsCreate) assert.True(t, ev3.IsModify) assert.False(t, ev3.IsDelete) assert.EqualValues(t, "key2", ev3.New.Key) assert.EqualValues(t, "key2", ev3.Old.Key) - ev4 := <- response + ev4 := <-response assert.False(t, ev4.IsCreate) assert.False(t, ev4.IsModify) assert.True(t, ev4.IsDelete) assert.Nil(t, ev4.New) assert.EqualValues(t, "key2", ev4.Old.Key) - ev5 := <- response + ev5 := <-response assert.False(t, ev5.IsCreate) assert.False(t, ev5.IsModify) assert.True(t, ev5.IsDelete) assert.Nil(t, ev5.New) assert.EqualValues(t, "key1", ev5.Old.Key) - // Check chan is closed - _, ok := <- response + ev6 := <-response + assert.Equal(t, + "etcdserver: mvcc: required revision is a future revision", + ev6.Err.Error()) + + // Check channels are closed + _, ok := <-response + assert.False(t, ok) + _, ok = <-ch assert.False(t, ok) } From a33ce7412428045a2ecdc01d4f5fdf2ae45ad968 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 12:36:35 +0000 Subject: [PATCH 10/22] Test GetNewHostBrickAllocations --- Makefile | 4 +- internal/pkg/keystoreregistry/pool.go | 31 +++++++++ internal/pkg/keystoreregistry/pool_test.go | 77 ++++++++++++++++++++++ 3 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 internal/pkg/keystoreregistry/pool_test.go diff --git a/Makefile b/Makefile index a93ddc7a..1982b04b 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ # limitations under the License. -all: deps buildlocal test format +all: deps buildlocal format test buildlocal: mkdir -p `pwd`/bin @@ -24,8 +24,8 @@ format: test: mkdir -p `pwd`/bin ./build/rebuild_mocks.sh - go test -cover -race -coverprofile=./bin/coverage.txt ./... go vet ./... + go test -cover -race -coverprofile=./bin/coverage.txt ./... test-func: ./build/func_test.sh diff --git a/internal/pkg/keystoreregistry/pool.go b/internal/pkg/keystoreregistry/pool.go index 28f725b3..0cfa5609 100644 --- a/internal/pkg/keystoreregistry/pool.go +++ b/internal/pkg/keystoreregistry/pool.go @@ -325,6 +325,37 @@ func (poolRegistry *poolRegistry) WatchHostBrickAllocations(hostname string, }) } +func (poolRegistry *poolRegistry) GetNewHostBrickAllocations( + ctxt context.Context, hostname string) <-chan registry.BrickAllocation { + + events := make(chan registry.BrickAllocation) + + key := getPrefixAllocationHost(hostname) + rawEvents := poolRegistry.keystore.Watch(ctxt, key, true) + + go func() { + for raw := range rawEvents { + if raw.Err != nil { + // consider sending error back to the listener? For now force process restart + log.Panicf("Error when watching %s for new brick hosts: %+v", hostname, raw) + } + if raw.IsCreate { + newBrick := registry.BrickAllocation{} + err := json.Unmarshal(bytes.NewBufferString(raw.New.Value).Bytes(), &newBrick) + if err != nil { + log.Panicf("error parsing create brick host %s event %+v %s", hostname, raw, err) + } else { + events <- newBrick + } + } + } + // we get here if the context is canceled, etc + close(events) + }() + + return events +} + func (poolRegistry *poolRegistry) getBricks(prefix string) ([]registry.BrickInfo, error) { raw, err := poolRegistry.keystore.GetAll(prefix) if err != nil { diff --git a/internal/pkg/keystoreregistry/pool_test.go b/internal/pkg/keystoreregistry/pool_test.go new file mode 100644 index 00000000..22c01c8e --- /dev/null +++ b/internal/pkg/keystoreregistry/pool_test.go @@ -0,0 +1,77 @@ +package keystoreregistry + +import ( + "context" + "github.com/RSE-Cambridge/data-acc/internal/pkg/registry" + "github.com/stretchr/testify/assert" + "testing" +) + +type fakeKeystore struct { + watchChan KeyValueUpdateChan +} + +func (fakeKeystore) Close() error { + panic("implement me") +} +func (fakeKeystore) CleanPrefix(prefix string) error { + panic("implement me") +} +func (fakeKeystore) Add(keyValues []KeyValue) error { + panic("implement me") +} +func (fakeKeystore) Update(keyValues []KeyValueVersion) error { + panic("implement me") +} +func (fakeKeystore) DeleteAll(keyValues []KeyValueVersion) error { + panic("implement me") +} +func (fakeKeystore) GetAll(prefix string) ([]KeyValueVersion, error) { + panic("implement me") +} +func (fakeKeystore) Get(key string) (KeyValueVersion, error) { + panic("implement me") +} +func (fakeKeystore) WatchPrefix(prefix string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion)) { + panic("implement me") +} +func (fakeKeystore) WatchKey(ctxt context.Context, key string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion)) { + panic("implement me") +} +func (fk fakeKeystore) Watch(ctxt context.Context, key string, withPrefix bool) KeyValueUpdateChan { + return fk.watchChan +} +func (fakeKeystore) KeepAliveKey(key string) error { + panic("implement me") +} +func (fakeKeystore) NewMutex(lockKey string) (Mutex, error) { + panic("implement me") +} + +func TestPoolRegistry_GetNewHostBrickAllocations(t *testing.T) { + rawEvents := make(chan KeyValueUpdate) + reg := poolRegistry{keystore: &fakeKeystore{watchChan:rawEvents}} + + events := reg.GetNewHostBrickAllocations(context.TODO(), "host1") + + go func() { + rawEvents <- KeyValueUpdate{IsCreate:false} + rawEvents <- KeyValueUpdate{ + IsCreate:true, + New: &KeyValueVersion{Value: toJson(registry.BrickAllocation{ + Hostname:"host1", Device:"sdb", + })}, + } + rawEvents <- KeyValueUpdate{IsCreate:false} + close(rawEvents) + }() + + ev1 := <-events + assert.Equal(t, "host1", ev1.Hostname) + assert.Equal(t, "sdb", ev1.Device) + + _, ok := <-events + assert.False(t, ok) + _, ok = <-rawEvents + assert.False(t, ok) +} From 4e125c2f4c766b43f17779cddb9954cfce1f16b1 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 13:10:47 +0000 Subject: [PATCH 11/22] Move to use GetNewHostBrickAllocations --- cmd/dac-func-test/pool.go | 23 ------------------- internal/pkg/keystoreregistry/pool.go | 20 ---------------- internal/pkg/keystoreregistry/pool_test.go | 10 ++++---- internal/pkg/lifecycle/brickmanager/bricks.go | 22 +++++++----------- internal/pkg/mocks/pool_mock.go | 15 +++++++----- internal/pkg/registry/pool.go | 8 +++++-- 6 files changed, 28 insertions(+), 70 deletions(-) diff --git a/cmd/dac-func-test/pool.go b/cmd/dac-func-test/pool.go index 25285569..b2536e13 100644 --- a/cmd/dac-func-test/pool.go +++ b/cmd/dac-func-test/pool.go @@ -54,28 +54,6 @@ func testGetBricks(poolRegistry registry.PoolRegistry) { } } -func testAllocateBricks(poolRegistry registry.PoolRegistry) { - poolRegistry.WatchHostBrickAllocations("foo", func(old *registry.BrickAllocation, - new *registry.BrickAllocation) { - log.Printf("**Allocation update. Old: %+v New: %+v", old, new) - if new.DeallocateRequested { - log.Printf("requested clean of: %d:%s", new.AllocatedIndex, new.Device) - } - }) - //allocations := []registry.BrickAllocation{ - // {Hostname: "foo", Device: "vbdb1", AllocatedVolume: "vol1"}, - // {Hostname: "foo", Device: "nvme3n1", AllocatedVolume: "vol1"}, - //} - // TODO: create a volume to get the bricks allocated from? - //if err := poolRegistry.AllocateBricks(allocations); err != nil { - // log.Fatal(err) - //} - if err := poolRegistry.DeallocateBricks("vol1"); err != nil { - log.Fatal(err) - } - log.Println("asdf") -} - func testGetAllocations(poolRegistry registry.PoolRegistry) { allocations, err := poolRegistry.GetAllocationsForHost("foo") if err != nil { @@ -132,7 +110,6 @@ func TestKeystorePoolRegistry(keystore keystoreregistry.Keystore) { poolRegistry := keystoreregistry.NewPoolRegistry(keystore) testUpdateHost(poolRegistry) testGetBricks(poolRegistry) - testAllocateBricks(poolRegistry) testGetAllocations(poolRegistry) testDeleteAllocations(poolRegistry) testKeepHostAlive(poolRegistry) diff --git a/internal/pkg/keystoreregistry/pool.go b/internal/pkg/keystoreregistry/pool.go index 0cfa5609..a9a78f44 100644 --- a/internal/pkg/keystoreregistry/pool.go +++ b/internal/pkg/keystoreregistry/pool.go @@ -305,26 +305,6 @@ func (poolRegistry *poolRegistry) GetBrickInfo(hostname string, device string) ( return value, error } -func (poolRegistry *poolRegistry) WatchHostBrickAllocations(hostname string, - callback func(old *registry.BrickAllocation, new *registry.BrickAllocation)) { - key := getPrefixAllocationHost(hostname) - poolRegistry.keystore.WatchPrefix(key, func(old *KeyValueVersion, new *KeyValueVersion) { - oldBrick := ®istry.BrickAllocation{} - newBrick := ®istry.BrickAllocation{} - if old != nil { - if old.Value != "" { - json.Unmarshal(bytes.NewBufferString(old.Value).Bytes(), &oldBrick) - } - } - if new != nil { - if new.Value != "" { - json.Unmarshal(bytes.NewBufferString(new.Value).Bytes(), &newBrick) - } - } - callback(oldBrick, newBrick) - }) -} - func (poolRegistry *poolRegistry) GetNewHostBrickAllocations( ctxt context.Context, hostname string) <-chan registry.BrickAllocation { diff --git a/internal/pkg/keystoreregistry/pool_test.go b/internal/pkg/keystoreregistry/pool_test.go index 22c01c8e..2965eff6 100644 --- a/internal/pkg/keystoreregistry/pool_test.go +++ b/internal/pkg/keystoreregistry/pool_test.go @@ -50,19 +50,19 @@ func (fakeKeystore) NewMutex(lockKey string) (Mutex, error) { func TestPoolRegistry_GetNewHostBrickAllocations(t *testing.T) { rawEvents := make(chan KeyValueUpdate) - reg := poolRegistry{keystore: &fakeKeystore{watchChan:rawEvents}} + reg := poolRegistry{keystore: &fakeKeystore{watchChan: rawEvents}} events := reg.GetNewHostBrickAllocations(context.TODO(), "host1") go func() { - rawEvents <- KeyValueUpdate{IsCreate:false} + rawEvents <- KeyValueUpdate{IsCreate: false} rawEvents <- KeyValueUpdate{ - IsCreate:true, + IsCreate: true, New: &KeyValueVersion{Value: toJson(registry.BrickAllocation{ - Hostname:"host1", Device:"sdb", + Hostname: "host1", Device: "sdb", })}, } - rawEvents <- KeyValueUpdate{IsCreate:false} + rawEvents <- KeyValueUpdate{IsCreate: false} close(rawEvents) }() diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 52589f53..c5286836 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -1,6 +1,7 @@ package brickmanager import ( + "context" "github.com/RSE-Cambridge/data-acc/internal/pkg/pfsprovider/ansible" "github.com/RSE-Cambridge/data-acc/internal/pkg/registry" "log" @@ -10,20 +11,13 @@ import ( func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry registry.VolumeRegistry, hostname string) { - poolRegistry.WatchHostBrickAllocations(hostname, - func(old *registry.BrickAllocation, new *registry.BrickAllocation) { - // log.Println("Noticed brick allocation update. Old:", old, "New:", new) - if new.AllocatedVolume != "" && old.AllocatedVolume == "" && new.AllocatedIndex == 0 { - //log.Println("Dectected we host primary brick for:", - // new.AllocatedVolume, "Must check for action.") - processNewPrimaryBlock(poolRegistry, volumeRegistry, new) - } - if old.AllocatedVolume != "" { - if new.DeallocateRequested && !old.DeallocateRequested { - log.Printf("Requested clean of brick: %d:%s", new.AllocatedIndex, new.Device) - } - } - }) + newBricks := poolRegistry.GetNewHostBrickAllocations(context.Background(), hostname) + go func() { + for brick := range newBricks { + go processNewPrimaryBlock(poolRegistry, volumeRegistry, &brick) + } + log.Panic("we appear to have stopped watching for new bricks") + }() allocations, err := poolRegistry.GetAllocationsForHost(hostname) if err != nil { diff --git a/internal/pkg/mocks/pool_mock.go b/internal/pkg/mocks/pool_mock.go index 821a9b02..9b759f1b 100644 --- a/internal/pkg/mocks/pool_mock.go +++ b/internal/pkg/mocks/pool_mock.go @@ -5,6 +5,7 @@ package mocks import ( + context "context" registry "github.com/RSE-Cambridge/data-acc/internal/pkg/registry" gomock "github.com/golang/mock/gomock" reflect "reflect" @@ -146,12 +147,14 @@ func (mr *MockPoolRegistryMockRecorder) GetBrickInfo(hostname, device interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBrickInfo", reflect.TypeOf((*MockPoolRegistry)(nil).GetBrickInfo), hostname, device) } -// WatchHostBrickAllocations mocks base method -func (m *MockPoolRegistry) WatchHostBrickAllocations(hostname string, callback func(*registry.BrickAllocation, *registry.BrickAllocation)) { - m.ctrl.Call(m, "WatchHostBrickAllocations", hostname, callback) +// GetNewHostBrickAllocations mocks base method +func (m *MockPoolRegistry) GetNewHostBrickAllocations(ctxt context.Context, hostname string) <-chan registry.BrickAllocation { + ret := m.ctrl.Call(m, "GetNewHostBrickAllocations", ctxt, hostname) + ret0, _ := ret[0].(<-chan registry.BrickAllocation) + return ret0 } -// WatchHostBrickAllocations indicates an expected call of WatchHostBrickAllocations -func (mr *MockPoolRegistryMockRecorder) WatchHostBrickAllocations(hostname, callback interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchHostBrickAllocations", reflect.TypeOf((*MockPoolRegistry)(nil).WatchHostBrickAllocations), hostname, callback) +// GetNewHostBrickAllocations indicates an expected call of GetNewHostBrickAllocations +func (mr *MockPoolRegistryMockRecorder) GetNewHostBrickAllocations(ctxt, hostname interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNewHostBrickAllocations", reflect.TypeOf((*MockPoolRegistry)(nil).GetNewHostBrickAllocations), ctxt, hostname) } diff --git a/internal/pkg/registry/pool.go b/internal/pkg/registry/pool.go index 10aa21e2..09acd4d8 100644 --- a/internal/pkg/registry/pool.go +++ b/internal/pkg/registry/pool.go @@ -1,6 +1,7 @@ package registry import ( + "context" "encoding/json" "log" ) @@ -60,8 +61,11 @@ type PoolRegistry interface { // Get information on a specific brick GetBrickInfo(hostname string, device string) (BrickInfo, error) - // Register for callbacks when allocate or deallocate of a brick on the given host occurs - WatchHostBrickAllocations(hostname string, callback func(old *BrickAllocation, new *BrickAllocation)) + // Returns a channel that reports all new brick allocations for given hostname + // + // The channel is closed when the context is cancelled or timeout. + // Any errors in the watching log the issue and panic + GetNewHostBrickAllocations(ctxt context.Context, hostname string) <-chan BrickAllocation } type Pool struct { From aed2304cadf633421bf8e38def7ca375c08dafee Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 13:50:49 +0000 Subject: [PATCH 12/22] Only process the primary blocks --- internal/pkg/lifecycle/brickmanager/bricks.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index c5286836..887d242c 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -14,7 +14,9 @@ func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry newBricks := poolRegistry.GetNewHostBrickAllocations(context.Background(), hostname) go func() { for brick := range newBricks { - go processNewPrimaryBlock(poolRegistry, volumeRegistry, &brick) + if brick.AllocatedIndex == 0 { + go processNewPrimaryBlock(poolRegistry, volumeRegistry, &brick) + } } log.Panic("we appear to have stopped watching for new bricks") }() From ad6cbe882f97801df4766bf4eb79baaf51622273 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 13:56:43 +0000 Subject: [PATCH 13/22] Add extra logging --- internal/pkg/lifecycle/brickmanager/bricks.go | 3 +++ tools/slurm-test.sh | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 887d242c..8f67ee5d 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -15,7 +15,10 @@ func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry go func() { for brick := range newBricks { if brick.AllocatedIndex == 0 { + log.Printf("found new primary brick %+v", brick) go processNewPrimaryBlock(poolRegistry, volumeRegistry, &brick) + } else { + log.Printf("ignore block create, as it is not a primary brick %+v", brick) } } log.Panic("we appear to have stopped watching for new bricks") diff --git a/tools/slurm-test.sh b/tools/slurm-test.sh index 3d12d3ac..dd9ca01c 100755 --- a/tools/slurm-test.sh +++ b/tools/slurm-test.sh @@ -57,7 +57,7 @@ su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' -su centos -c 'sbatch --array=1-10 test-persistent.sh' +su centos -c 'sbatch --array=1-10 use-persistent.sh' squeue From c0fb6136466dd9f0c156c6269895cb32580b991a Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 14:00:51 +0000 Subject: [PATCH 14/22] On restart always watch if we have a primary brick --- internal/pkg/lifecycle/brickmanager/bricks.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 8f67ee5d..f545e6b1 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -37,11 +37,11 @@ func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry if err != nil { log.Panicf("unable to find volume for allocation %+v", allocation) } - log.Println("We host a primary brick for:", volume.Name, volume) - if volume.State == registry.BricksProvisioned || volume.State == registry.DataInComplete { - log.Println("Start watch for changes to volume again:", volume.Name) - watchForVolumeChanges(poolRegistry, volumeRegistry, volume) - } + log.Printf("Start watching again, as we host a primary brick for: %+v", volume) + // TODO: do we finish watching correctly? + watchForVolumeChanges(poolRegistry, volumeRegistry, volume) + + // TODO: trigger events if we missed the "edge" already if volume.State == registry.DeleteRequested { log.Println("Complete pending delete request for volume:", volume.Name) processDelete(poolRegistry, volumeRegistry, volume) From 241c15c4e27967716e4e36c9445941a4f69be85e Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 14:02:28 +0000 Subject: [PATCH 15/22] Try find all volumes for any existing allocation --- internal/pkg/lifecycle/brickmanager/bricks.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index f545e6b1..80b38edd 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -32,11 +32,11 @@ func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry } for _, allocation := range allocations { + volume, err := volumeRegistry.Volume(allocation.AllocatedVolume) + if err != nil { + log.Panicf("unable to find volume for allocation %+v", allocation) + } if allocation.AllocatedIndex == 0 { - volume, err := volumeRegistry.Volume(allocation.AllocatedVolume) - if err != nil { - log.Panicf("unable to find volume for allocation %+v", allocation) - } log.Printf("Start watching again, as we host a primary brick for: %+v", volume) // TODO: do we finish watching correctly? watchForVolumeChanges(poolRegistry, volumeRegistry, volume) From 0b0886c85bd1054a9ed3d4a51f40cf3ba5256a10 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 14:09:43 +0000 Subject: [PATCH 16/22] Do not leave allocations behind on volume delete --- internal/pkg/lifecycle/volume.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/pkg/lifecycle/volume.go b/internal/pkg/lifecycle/volume.go index 67f04e6d..35f6ce1a 100644 --- a/internal/pkg/lifecycle/volume.go +++ b/internal/pkg/lifecycle/volume.go @@ -43,9 +43,16 @@ func (vlm *volumeLifecycleManager) Delete() error { // TODO convert errors into volume related errors, somewhere? log.Println("Deleting volume:", vlm.volume.Name, vlm.volume) - if vlm.volume.SizeBricks == 0 || vlm.volume.HadBricksAssigned == false { + if vlm.volume.SizeBricks == 0 { log.Println("No bricks to delete, skipping request delete bricks for:", vlm.volume.Name) - + } else if vlm.volume.HadBricksAssigned == false { + allocations, _ := vlm.poolRegistry.GetAllocationsForVolume(vlm.volume.Name) + if len(allocations) == 0 { + // TODO should we be holding a lock here? + log.Println("No bricks yet assigned, skip delete bricks.") + } else { + return fmt.Errorf("bricks assigned but dacd hasn't noticed them yet for: %+v", vlm.volume) + } } else { log.Printf("Requested delete of %d bricks for %s", vlm.volume.SizeBricks, vlm.volume.Name) err := vlm.volumeRegistry.UpdateState(vlm.volume.Name, registry.DeleteRequested) From 2b072c0221f31613c819c762c62e0a2043ae8ec5 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 14:19:40 +0000 Subject: [PATCH 17/22] Add sleep into fake processing to induce more errors --- internal/pkg/pfsprovider/ansible/ansible.go | 1 + internal/pkg/pfsprovider/ansible/mount.go | 2 ++ tools/slurm-test.sh | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/pkg/pfsprovider/ansible/ansible.go b/internal/pkg/pfsprovider/ansible/ansible.go index a776d0ce..0da39f1c 100644 --- a/internal/pkg/pfsprovider/ansible/ansible.go +++ b/internal/pkg/pfsprovider/ansible/ansible.go @@ -260,6 +260,7 @@ func executeAnsiblePlaybook(dir string, args string) error { skipAnsible := os.Getenv("DAC_SKIP_ANSIBLE") if skipAnsible == "True" { log.Println("Skip as DAC_SKIP_ANSIBLE=True") + time.Sleep(time.Millisecond * 200) return nil } diff --git a/internal/pkg/pfsprovider/ansible/mount.go b/internal/pkg/pfsprovider/ansible/mount.go index 37e30753..c498e96a 100644 --- a/internal/pkg/pfsprovider/ansible/mount.go +++ b/internal/pkg/pfsprovider/ansible/mount.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path" + "time" ) func getMountDir(volume registry.Volume, jobName string) string { @@ -265,6 +266,7 @@ func (*run) Execute(hostname string, cmdStr string) error { skipAnsible := os.Getenv("DAC_SKIP_ANSIBLE") if skipAnsible == "True" { log.Println("Skip as DAC_SKIP_ANSIBLE=True") + time.Sleep(time.Millisecond * 200) return nil } diff --git a/tools/slurm-test.sh b/tools/slurm-test.sh index dd9ca01c..123f7758 100755 --- a/tools/slurm-test.sh +++ b/tools/slurm-test.sh @@ -53,11 +53,11 @@ squeue echo "***Use persistent buffer***" adduser centos cat use-persistent.sh +su centos -c 'sbatch --array=1-10 use-persistent.sh' su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' su centos -c 'sbatch use-persistent.sh' -su centos -c 'sbatch --array=1-10 use-persistent.sh' squeue From 810e384528a613a757c0d84ba2865501bccd2324 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 15:16:28 +0000 Subject: [PATCH 18/22] Add initial test for GetVolumeChanges --- internal/pkg/keystoreregistry/pool.go | 5 ++- internal/pkg/keystoreregistry/pool_test.go | 9 +++++ internal/pkg/keystoreregistry/volume.go | 38 ++++++++++++++++++++ internal/pkg/keystoreregistry/volume_test.go | 24 +++++++++++++ internal/pkg/mocks/volume_mock.go | 13 +++++++ internal/pkg/registry/volume.go | 16 +++++++++ 6 files changed, 104 insertions(+), 1 deletion(-) diff --git a/internal/pkg/keystoreregistry/pool.go b/internal/pkg/keystoreregistry/pool.go index a9a78f44..541b43e8 100644 --- a/internal/pkg/keystoreregistry/pool.go +++ b/internal/pkg/keystoreregistry/pool.go @@ -314,6 +314,10 @@ func (poolRegistry *poolRegistry) GetNewHostBrickAllocations( rawEvents := poolRegistry.keystore.Watch(ctxt, key, true) go func() { + defer close(events) + if rawEvents == nil { + return + } for raw := range rawEvents { if raw.Err != nil { // consider sending error back to the listener? For now force process restart @@ -330,7 +334,6 @@ func (poolRegistry *poolRegistry) GetNewHostBrickAllocations( } } // we get here if the context is canceled, etc - close(events) }() return events diff --git a/internal/pkg/keystoreregistry/pool_test.go b/internal/pkg/keystoreregistry/pool_test.go index 2965eff6..8ed65525 100644 --- a/internal/pkg/keystoreregistry/pool_test.go +++ b/internal/pkg/keystoreregistry/pool_test.go @@ -75,3 +75,12 @@ func TestPoolRegistry_GetNewHostBrickAllocations(t *testing.T) { _, ok = <-rawEvents assert.False(t, ok) } + +func TestPoolRegistry_GetNewHostBrickAllocations_nil(t *testing.T) { + reg := poolRegistry{keystore: &fakeKeystore{}} + + events := reg.GetNewHostBrickAllocations(context.TODO(), "host1") + + _, ok := <-events + assert.False(t, ok) +} diff --git a/internal/pkg/keystoreregistry/volume.go b/internal/pkg/keystoreregistry/volume.go index 9cd18f51..ee9a86ed 100644 --- a/internal/pkg/keystoreregistry/volume.go +++ b/internal/pkg/keystoreregistry/volume.go @@ -321,6 +321,44 @@ func (volRegistry *volumeRegistry) WatchVolumeChanges(volumeName string, return nil // TODO check key is present } +func (volRegistry *volumeRegistry) GetVolumeChanges(ctx context.Context, volume registry.Volume) registry.VolumeChangeChan { + // TODO: we should watch from the version of the passed in volume + key := getVolumeKey(string(volume.Name)) + rawEvents := volRegistry.keystore.Watch(ctx, key, false) + + events := make(chan registry.VolumeChange) + + go func() { + defer close(events) + if rawEvents == nil { + return + } + for rawEvent := range rawEvents { + if rawEvent.Err != nil { + events <- registry.VolumeChange{Err: rawEvent.Err} + continue + } + + event := registry.VolumeChange{ + IsDelete: rawEvent.IsDelete, + } + if rawEvent.Old != nil { + if err := volumeFromKeyValue(*rawEvent.Old, event.Old); err != nil { + rawEvent.Err = err + } + } + if rawEvent.New != nil { + if err := volumeFromKeyValue(*rawEvent.New, event.New); err != nil { + rawEvent.Err = err + } + } + events <- event + } + }() + + return events +} + func (volRegistry *volumeRegistry) WaitForState(volumeName registry.VolumeName, state registry.VolumeState) error { log.Println("Start waiting for volume", volumeName, "to reach state", state) err := volRegistry.WaitForCondition(volumeName, func(old *registry.Volume, new *registry.Volume) bool { diff --git a/internal/pkg/keystoreregistry/volume_test.go b/internal/pkg/keystoreregistry/volume_test.go index fb09f976..d21c1a5c 100644 --- a/internal/pkg/keystoreregistry/volume_test.go +++ b/internal/pkg/keystoreregistry/volume_test.go @@ -1,6 +1,7 @@ package keystoreregistry import ( + "context" "github.com/RSE-Cambridge/data-acc/internal/pkg/registry" "github.com/stretchr/testify/assert" "testing" @@ -69,3 +70,26 @@ func TestVolumeRegistry_MergeAttachments(t *testing.T) { assert.Equal(t, updates[1], result[1]) assert.Equal(t, oldAttachments[0], result[2]) } + +func TestVolumeRegistry_GetVolumeChanges_nil(t *testing.T) { + volReg := volumeRegistry{keystore: fakeKeystore{watchChan: nil}} + + changes := volReg.GetVolumeChanges(context.TODO(), registry.Volume{Name: "vol1"}) + + _, ok := <-changes + assert.False(t, ok) +} + +func TestVolumeRegistry_GetVolumeChanges(t *testing.T) { + raw := make(chan KeyValueUpdate) + volReg := volumeRegistry{keystore: fakeKeystore{watchChan: raw}} + + changes := volReg.GetVolumeChanges(context.TODO(), registry.Volume{Name: "vol1"}) + + close(raw) + + _, ok := <-changes + assert.False(t, ok) + _, ok = <-raw + assert.False(t, ok) +} diff --git a/internal/pkg/mocks/volume_mock.go b/internal/pkg/mocks/volume_mock.go index 109b482b..9ca2cc24 100644 --- a/internal/pkg/mocks/volume_mock.go +++ b/internal/pkg/mocks/volume_mock.go @@ -5,6 +5,7 @@ package mocks import ( + context "context" registry "github.com/RSE-Cambridge/data-acc/internal/pkg/registry" gomock "github.com/golang/mock/gomock" reflect "reflect" @@ -216,3 +217,15 @@ func (m *MockVolumeRegistry) WatchVolumeChanges(volumeName string, callback func func (mr *MockVolumeRegistryMockRecorder) WatchVolumeChanges(volumeName, callback interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchVolumeChanges", reflect.TypeOf((*MockVolumeRegistry)(nil).WatchVolumeChanges), volumeName, callback) } + +// GetVolumeChanges mocks base method +func (m *MockVolumeRegistry) GetVolumeChanges(ctx context.Context, volume registry.Volume) registry.VolumeChangeChan { + ret := m.ctrl.Call(m, "GetVolumeChanges", ctx, volume) + ret0, _ := ret[0].(registry.VolumeChangeChan) + return ret0 +} + +// GetVolumeChanges indicates an expected call of GetVolumeChanges +func (mr *MockVolumeRegistryMockRecorder) GetVolumeChanges(ctx, volume interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVolumeChanges", reflect.TypeOf((*MockVolumeRegistry)(nil).GetVolumeChanges), ctx, volume) +} diff --git a/internal/pkg/registry/volume.go b/internal/pkg/registry/volume.go index f3146285..cb189a06 100644 --- a/internal/pkg/registry/volume.go +++ b/internal/pkg/registry/volume.go @@ -2,6 +2,7 @@ package registry import ( "bytes" + "context" "encoding/json" ) @@ -58,6 +59,21 @@ type VolumeRegistry interface { // If the volume is new, old = nil // used by the primary brick to get volume updates WatchVolumeChanges(volumeName string, callback func(old *Volume, new *Volume) bool) error + + // TODO: replacing WatchVolumeChanges + // Gets all changes that happen to the given volume + // + // To stop watching cancel or timeout the context, this will close the channel. + GetVolumeChanges(ctx context.Context, volume Volume) VolumeChangeChan +} + +type VolumeChangeChan <-chan VolumeChange + +type VolumeChange struct { + New *Volume + Old *Volume + IsDelete bool + Err error } // TODO: Attachment request, or session is probably a better name here... From 5a7d2b578bf8d9424326efdc18692414092fff10 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 15:56:49 +0000 Subject: [PATCH 19/22] Test and fix up GetVolumeChanges --- internal/pkg/keystoreregistry/pool_test.go | 17 ++++++-- internal/pkg/keystoreregistry/volume.go | 16 ++++++-- internal/pkg/keystoreregistry/volume_test.go | 43 ++++++++++++++++++-- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/internal/pkg/keystoreregistry/pool_test.go b/internal/pkg/keystoreregistry/pool_test.go index 8ed65525..7a71de58 100644 --- a/internal/pkg/keystoreregistry/pool_test.go +++ b/internal/pkg/keystoreregistry/pool_test.go @@ -8,7 +8,10 @@ import ( ) type fakeKeystore struct { - watchChan KeyValueUpdateChan + watchChan KeyValueUpdateChan + t *testing.T + key string + withPrefix bool } func (fakeKeystore) Close() error { @@ -39,6 +42,8 @@ func (fakeKeystore) WatchKey(ctxt context.Context, key string, onUpdate func(old panic("implement me") } func (fk fakeKeystore) Watch(ctxt context.Context, key string, withPrefix bool) KeyValueUpdateChan { + assert.Equal(fk.t, fk.key, key) + assert.Equal(fk.t, fk.withPrefix, withPrefix) return fk.watchChan } func (fakeKeystore) KeepAliveKey(key string) error { @@ -50,7 +55,9 @@ func (fakeKeystore) NewMutex(lockKey string) (Mutex, error) { func TestPoolRegistry_GetNewHostBrickAllocations(t *testing.T) { rawEvents := make(chan KeyValueUpdate) - reg := poolRegistry{keystore: &fakeKeystore{watchChan: rawEvents}} + reg := poolRegistry{keystore: &fakeKeystore{ + watchChan: rawEvents, t: t, key: "/bricks/allocated/host/host1/", withPrefix: true, + }} events := reg.GetNewHostBrickAllocations(context.TODO(), "host1") @@ -77,9 +84,11 @@ func TestPoolRegistry_GetNewHostBrickAllocations(t *testing.T) { } func TestPoolRegistry_GetNewHostBrickAllocations_nil(t *testing.T) { - reg := poolRegistry{keystore: &fakeKeystore{}} + reg := poolRegistry{keystore: &fakeKeystore{ + watchChan: nil, t: t, key: "/bricks/allocated/host/host2/", withPrefix: true, + }} - events := reg.GetNewHostBrickAllocations(context.TODO(), "host1") + events := reg.GetNewHostBrickAllocations(context.TODO(), "host2") _, ok := <-events assert.False(t, ok) diff --git a/internal/pkg/keystoreregistry/volume.go b/internal/pkg/keystoreregistry/volume.go index ee9a86ed..2fbebdc8 100644 --- a/internal/pkg/keystoreregistry/volume.go +++ b/internal/pkg/keystoreregistry/volume.go @@ -341,15 +341,23 @@ func (volRegistry *volumeRegistry) GetVolumeChanges(ctx context.Context, volume event := registry.VolumeChange{ IsDelete: rawEvent.IsDelete, + Old: nil, + New: nil, } if rawEvent.Old != nil { - if err := volumeFromKeyValue(*rawEvent.Old, event.Old); err != nil { - rawEvent.Err = err + oldVolume := ®istry.Volume{} + if err := volumeFromKeyValue(*rawEvent.Old, oldVolume); err != nil { + event.Err = err + } else { + event.Old = oldVolume } } if rawEvent.New != nil { - if err := volumeFromKeyValue(*rawEvent.New, event.New); err != nil { - rawEvent.Err = err + newVolume := ®istry.Volume{} + if err := volumeFromKeyValue(*rawEvent.New, newVolume); err != nil { + event.Err = err + } else { + event.New = newVolume } } events <- event diff --git a/internal/pkg/keystoreregistry/volume_test.go b/internal/pkg/keystoreregistry/volume_test.go index d21c1a5c..3af42647 100644 --- a/internal/pkg/keystoreregistry/volume_test.go +++ b/internal/pkg/keystoreregistry/volume_test.go @@ -72,7 +72,7 @@ func TestVolumeRegistry_MergeAttachments(t *testing.T) { } func TestVolumeRegistry_GetVolumeChanges_nil(t *testing.T) { - volReg := volumeRegistry{keystore: fakeKeystore{watchChan: nil}} + volReg := volumeRegistry{keystore: fakeKeystore{watchChan: nil, t: t, key: "/volume/vol1/"}} changes := volReg.GetVolumeChanges(context.TODO(), registry.Volume{Name: "vol1"}) @@ -82,11 +82,48 @@ func TestVolumeRegistry_GetVolumeChanges_nil(t *testing.T) { func TestVolumeRegistry_GetVolumeChanges(t *testing.T) { raw := make(chan KeyValueUpdate) - volReg := volumeRegistry{keystore: fakeKeystore{watchChan: raw}} + volReg := volumeRegistry{keystore: fakeKeystore{ + watchChan: raw, t: t, key: "/volume/vol1/", withPrefix: false, + }} changes := volReg.GetVolumeChanges(context.TODO(), registry.Volume{Name: "vol1"}) - close(raw) + vol := ®istry.Volume{Name: "test1"} + + go func() { + raw <- KeyValueUpdate{ + New: &KeyValueVersion{Key: "asdf", Value: toJson(vol)}, + Old: &KeyValueVersion{Key: "asdf", Value: toJson(vol)}, + } + raw <- KeyValueUpdate{ + IsDelete: true, + New: nil, + Old: &KeyValueVersion{Key: "asdf", Value: toJson(vol)}, + } + raw <- KeyValueUpdate{ + New: &KeyValueVersion{Key: "asdf", Value: "asdf"}, + Old: nil, + } + close(raw) + }() + + ch1 := <-changes + assert.Nil(t, ch1.Err) + assert.False(t, ch1.IsDelete) + assert.Equal(t, vol, ch1.Old) + assert.Equal(t, vol, ch1.New) + + ch2 := <-changes + assert.Nil(t, ch2.Err) + assert.True(t, ch2.IsDelete) + assert.Nil(t, ch2.New) + assert.Equal(t, vol, ch1.Old) + + ch3 := <-changes + assert.Equal(t, "invalid character 'a' looking for beginning of value", ch3.Err.Error()) + assert.False(t, ch3.IsDelete) + assert.Nil(t, ch3.Old) + assert.Nil(t, ch3.New) _, ok := <-changes assert.False(t, ok) From 2950fec43c524b6edd99196e2cd10302534975c1 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 16:31:23 +0000 Subject: [PATCH 20/22] Move to get Volume changes --- cmd/dac-func-test/volume.go | 5 +-- internal/pkg/keystoreregistry/volume.go | 21 ---------- internal/pkg/lifecycle/brickmanager/bricks.go | 38 ++++++++++++++----- internal/pkg/mocks/volume_mock.go | 12 ------ internal/pkg/registry/volume.go | 6 --- 5 files changed, 29 insertions(+), 53 deletions(-) diff --git a/cmd/dac-func-test/volume.go b/cmd/dac-func-test/volume.go index e5ca63ab..138d5d56 100644 --- a/cmd/dac-func-test/volume.go +++ b/cmd/dac-func-test/volume.go @@ -19,10 +19,7 @@ func TestKeystoreVolumeRegistry(keystore keystoreregistry.Keystore) { } func testVolumeCRUD(volRegistry registry.VolumeRegistry) { - volRegistry.WatchVolumeChanges("asdf", func(old *registry.Volume, new *registry.Volume) bool { - log.Printf("Volume update detected. old: %s new: %s", old.State, new.State) - return false - }) + // TODO: test get volume changes? volume := registry.Volume{Name: "asdf", State: registry.Registered, JobName: "foo", SizeBricks: 2, SizeGB: 200} volume2 := registry.Volume{Name: "asdf2", JobName: "foo", SizeBricks: 3, SizeGB: 300} diff --git a/internal/pkg/keystoreregistry/volume.go b/internal/pkg/keystoreregistry/volume.go index 2fbebdc8..76c00d93 100644 --- a/internal/pkg/keystoreregistry/volume.go +++ b/internal/pkg/keystoreregistry/volume.go @@ -300,27 +300,6 @@ func (volRegistry *volumeRegistry) DeleteVolume(name registry.VolumeName) error return volRegistry.keystore.DeleteAll([]KeyValueVersion{keyValue}) } -func (volRegistry *volumeRegistry) WatchVolumeChanges(volumeName string, - callback func(old *registry.Volume, new *registry.Volume) bool) error { - key := getVolumeKey(volumeName) - ctxt, cancelFunc := context.WithCancel(context.Background()) - volRegistry.keystore.WatchKey(ctxt, key, func(old *KeyValueVersion, new *KeyValueVersion) { - oldVolume := ®istry.Volume{} - newVolume := ®istry.Volume{} - if old != nil { - volumeFromKeyValue(*old, oldVolume) - } - if new != nil { - volumeFromKeyValue(*new, newVolume) - } - if callback(oldVolume, newVolume) { - log.Println("stopping watching volume", volumeName) - cancelFunc() - } - }) - return nil // TODO check key is present -} - func (volRegistry *volumeRegistry) GetVolumeChanges(ctx context.Context, volume registry.Volume) registry.VolumeChangeChan { // TODO: we should watch from the version of the passed in volume key := getVolumeKey(string(volume.Name)) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 80b38edd..3ccc566b 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -18,7 +18,7 @@ func setupBrickEventHandlers(poolRegistry registry.PoolRegistry, volumeRegistry log.Printf("found new primary brick %+v", brick) go processNewPrimaryBlock(poolRegistry, volumeRegistry, &brick) } else { - log.Printf("ignore block create, as it is not a primary brick %+v", brick) + log.Printf("ignore brick create, as it is not a primary brick %+v", brick) } } log.Panic("we appear to have stopped watching for new bricks") @@ -71,9 +71,30 @@ func processNewPrimaryBlock(poolRegistry registry.PoolRegistry, volumeRegistry r func watchForVolumeChanges(poolRegistry registry.PoolRegistry, volumeRegistry registry.VolumeRegistry, volume registry.Volume) { - // TODO: watch from version associated with above volume to avoid any missed events - volumeRegistry.WatchVolumeChanges(string(volume.Name), func(old *registry.Volume, new *registry.Volume) bool { - if old != nil && new != nil { + ctxt, cancelFunc := context.WithCancel(context.Background()) + changes := volumeRegistry.GetVolumeChanges(ctxt, volume) + + go func() { + defer cancelFunc() + + for change := range changes { + old := change.Old + new := change.New + + if change.IsDelete { + log.Printf("Stop watching volume: %s", volume.Name) + return + } + + if old == nil || new == nil { + log.Printf("nil volume seen, unable to process volume event: %+v", change) + } + + if change.Err != nil { + log.Printf("Error while waiting for volume %s saw error: %s with: %+v", + volume.Name, change.Err.Error(), change) + } + if new.State != old.State { switch new.State { case registry.DataInRequested: @@ -83,11 +104,10 @@ func watchForVolumeChanges(poolRegistry registry.PoolRegistry, volumeRegistry re case registry.DeleteRequested: processDelete(poolRegistry, volumeRegistry, *new) case registry.BricksDeleted: - log.Println("Volume", new.Name, "deleted, stop listening for events now.") - return true + log.Println("Volume", new.Name, "has had bricks deleted.") default: // Ignore the state changes we triggered - log.Println(". ingore volume:", volume.Name, "state move:", old.State, "->", new.State) + log.Printf("ignore volume:%s state move: %s -> %s", new.Name, old.State, new.State) } } @@ -123,9 +143,7 @@ func watchForVolumeChanges(poolRegistry registry.PoolRegistry, volumeRegistry re } } } - // keep watching - return false - }) + }() } func handleError(volumeRegistry registry.VolumeRegistry, volume registry.Volume, err error) { diff --git a/internal/pkg/mocks/volume_mock.go b/internal/pkg/mocks/volume_mock.go index 9ca2cc24..46d37db7 100644 --- a/internal/pkg/mocks/volume_mock.go +++ b/internal/pkg/mocks/volume_mock.go @@ -206,18 +206,6 @@ func (mr *MockVolumeRegistryMockRecorder) WaitForCondition(volumeName, condition return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForCondition", reflect.TypeOf((*MockVolumeRegistry)(nil).WaitForCondition), volumeName, condition) } -// WatchVolumeChanges mocks base method -func (m *MockVolumeRegistry) WatchVolumeChanges(volumeName string, callback func(*registry.Volume, *registry.Volume) bool) error { - ret := m.ctrl.Call(m, "WatchVolumeChanges", volumeName, callback) - ret0, _ := ret[0].(error) - return ret0 -} - -// WatchVolumeChanges indicates an expected call of WatchVolumeChanges -func (mr *MockVolumeRegistryMockRecorder) WatchVolumeChanges(volumeName, callback interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchVolumeChanges", reflect.TypeOf((*MockVolumeRegistry)(nil).WatchVolumeChanges), volumeName, callback) -} - // GetVolumeChanges mocks base method func (m *MockVolumeRegistry) GetVolumeChanges(ctx context.Context, volume registry.Volume) registry.VolumeChangeChan { ret := m.ctrl.Call(m, "GetVolumeChanges", ctx, volume) diff --git a/internal/pkg/registry/volume.go b/internal/pkg/registry/volume.go index cb189a06..0a5893a8 100644 --- a/internal/pkg/registry/volume.go +++ b/internal/pkg/registry/volume.go @@ -55,12 +55,6 @@ type VolumeRegistry interface { // TODO: remove wait for state? WaitForCondition(volumeName VolumeName, condition func(old *Volume, new *Volume) bool) error - // Get all callback on all volume changes - // If the volume is new, old = nil - // used by the primary brick to get volume updates - WatchVolumeChanges(volumeName string, callback func(old *Volume, new *Volume) bool) error - - // TODO: replacing WatchVolumeChanges // Gets all changes that happen to the given volume // // To stop watching cancel or timeout the context, this will close the channel. From b0727f485454c70f3c49c68ebe374c410dcbe853 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 16:35:34 +0000 Subject: [PATCH 21/22] Tweak volume state move logs --- internal/pkg/lifecycle/brickmanager/bricks.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/pkg/lifecycle/brickmanager/bricks.go b/internal/pkg/lifecycle/brickmanager/bricks.go index 3ccc566b..a93d860f 100644 --- a/internal/pkg/lifecycle/brickmanager/bricks.go +++ b/internal/pkg/lifecycle/brickmanager/bricks.go @@ -96,6 +96,7 @@ func watchForVolumeChanges(poolRegistry registry.PoolRegistry, volumeRegistry re } if new.State != old.State { + log.Printf("volume:%s state move: %s -> %s", new.Name, old.State, new.State) switch new.State { case registry.DataInRequested: processDataIn(volumeRegistry, *new) @@ -107,7 +108,7 @@ func watchForVolumeChanges(poolRegistry registry.PoolRegistry, volumeRegistry re log.Println("Volume", new.Name, "has had bricks deleted.") default: // Ignore the state changes we triggered - log.Printf("ignore volume:%s state move: %s -> %s", new.Name, old.State, new.State) + log.Printf("ignore volume state move %+v", change) } } From 27ec3bc50f43f73ac000518bea0f025320b6f7ed Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Thu, 3 Jan 2019 16:47:45 +0000 Subject: [PATCH 22/22] Remove old TODO --- internal/pkg/keystoreregistry/keystore.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/pkg/keystoreregistry/keystore.go b/internal/pkg/keystoreregistry/keystore.go index e747f076..347365e3 100644 --- a/internal/pkg/keystoreregistry/keystore.go +++ b/internal/pkg/keystoreregistry/keystore.go @@ -63,9 +63,6 @@ type Keystore interface { // is deleted, or you stop watching after some timeout. Watch(ctxt context.Context, key string, withPrefix bool) KeyValueUpdateChan - // TODO: WIP to replce above watch functions - // WatchForCondition(ctxt context.Context, key string, fromRevision int64, check func(update KeyValueUpdate) bool) (bool, error) - // Add a key, and remove it when calling process dies // Error is returned if the key already exists KeepAliveKey(key string) error