From d1e97101c9b76ada5a6c2c1ebddeb20247faa58f Mon Sep 17 00:00:00 2001 From: Daniel Cibrao Date: Wed, 26 Oct 2022 16:45:20 +0100 Subject: [PATCH 1/2] feat: Prevent deletes in consumers and streams Adding a `preventDelete` property for streams and consumers CRDS which when set to true nack should be able to delete it, otherwise it should just ignore that stream or consumer. The use case for this is in a multi-cloud context when we have deploy deployed in different clouds it becomes troublesome. For example, nack on GKE might check that a stream CRD is missing and proceeds to delete it, however that CRD exists in AWS and AKS and it should not be deleted because its going to impact other applications. --- controllers/jetstream/consumer.go | 5 +++++ controllers/jetstream/stream.go | 5 +++++ deploy/crds.yml | 8 +++++++ .../apis/jetstream/v1beta2/consumertypes.go | 1 + .../apis/jetstream/v1beta2/streamtypes.go | 1 + .../v1beta2/zz_generated.deepcopy.go | 21 +++++++++++++++++++ .../clientset/versioned/fake/register.go | 14 ++++++------- .../clientset/versioned/scheme/register.go | 14 ++++++------- 8 files changed, 55 insertions(+), 14 deletions(-) diff --git a/controllers/jetstream/consumer.go b/controllers/jetstream/consumer.go index 7bf012eb..3bf9cc72 100644 --- a/controllers/jetstream/consumer.go +++ b/controllers/jetstream/consumer.go @@ -387,6 +387,11 @@ func deleteConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e } }() + if spec.PreventDelete { + fmt.Printf("Consumer %q is configured to preventDelete on stream %q:\n", stream, consumer) + return nil + } + var apierr jsmapi.ApiError cn, err := c.LoadConsumer(ctx, stream, consumer) if errors.As(err, &apierr) && apierr.NotFoundError() { diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index ef9b1792..364f351c 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -397,6 +397,11 @@ func deleteStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e } }() + if spec.PreventDelete { + fmt.Printf("Stream %q is configured to preventDelete:\n", name) + return nil + } + var apierr jsmapi.ApiError str, err := c.LoadStream(ctx, name) if errors.As(err, &apierr) && apierr.NotFoundError() { diff --git a/deploy/crds.yml b/deploy/crds.yml index f7b06a68..a8b177b1 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -189,6 +189,10 @@ spec: source: type: string description: Messages will be published from that subject to the destination subject. + preventDelete: + description: Indicates if a Stream can be deleted (true) or not (false) + type: boolean + default: false status: type: object properties: @@ -551,6 +555,10 @@ spec: description: Name of the account to which the Consumer belongs. type: string pattern: '^[^.*>]*$' + preventDelete: + description: Indicates if a Consumer can be deleted (true) or not (false) + type: boolean + default: false status: type: object properties: diff --git a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go index dac2bddc..4dacc458 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go @@ -30,6 +30,7 @@ type ConsumerSpec struct { DeliverPolicy string `json:"deliverPolicy"` DeliverSubject string `json:"deliverSubject"` Description string `json:"description"` + PreventDelete bool `json:"preventDelete"` DurableName string `json:"durableName"` FilterSubject string `json:"filterSubject"` FlowControl bool `json:"flowControl"` diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index df85f775..435f4ced 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -25,6 +25,7 @@ type StreamSpec struct { Account string `json:"account"` Creds string `json:"creds"` Description string `json:"description"` + PreventDelete bool `json:"preventDelete"` Discard string `json:"discard"` DuplicateWindow string `json:"duplicateWindow"` MaxAge string `json:"maxAge"` diff --git a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go index 81a0d079..05dd69ee 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go @@ -399,6 +399,11 @@ func (in *StreamSpec) DeepCopyInto(out *StreamSpec) { *out = new(StreamPlacement) (*in).DeepCopyInto(*out) } + if in.Republish != nil { + in, out := &in.Republish, &out.Republish + *out = new(StreamSubjectMapping) + **out = **in + } if in.Servers != nil { in, out := &in.Servers, &out.Servers *out = make([]string, len(*in)) @@ -434,6 +439,22 @@ func (in *StreamSpec) DeepCopy() *StreamSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StreamSubjectMapping) DeepCopyInto(out *StreamSubjectMapping) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamSubjectMapping. +func (in *StreamSubjectMapping) DeepCopy() *StreamSubjectMapping { + if in == nil { + return nil + } + out := new(StreamSubjectMapping) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLS) DeepCopyInto(out *TLS) { *out = *in diff --git a/pkg/jetstream/generated/clientset/versioned/fake/register.go b/pkg/jetstream/generated/clientset/versioned/fake/register.go index f7385bec..f15e342d 100644 --- a/pkg/jetstream/generated/clientset/versioned/fake/register.go +++ b/pkg/jetstream/generated/clientset/versioned/fake/register.go @@ -34,14 +34,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/pkg/jetstream/generated/clientset/versioned/scheme/register.go b/pkg/jetstream/generated/clientset/versioned/scheme/register.go index ce9e4a83..c80af823 100644 --- a/pkg/jetstream/generated/clientset/versioned/scheme/register.go +++ b/pkg/jetstream/generated/clientset/versioned/scheme/register.go @@ -34,14 +34,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. From 2f514ec1fbc64a618544cf91dfb375af3178eb1d Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Thu, 27 Oct 2022 09:38:48 -0400 Subject: [PATCH 2/2] update preventDelete descriptions --- deploy/crds.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy/crds.yml b/deploy/crds.yml index a8b177b1..7be1cdb2 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -190,7 +190,7 @@ spec: type: string description: Messages will be published from that subject to the destination subject. preventDelete: - description: Indicates if a Stream can be deleted (true) or not (false) + description: When true, the managed Stream will not be deleted when the resource is deleted type: boolean default: false status: @@ -556,7 +556,7 @@ spec: type: string pattern: '^[^.*>]*$' preventDelete: - description: Indicates if a Consumer can be deleted (true) or not (false) + description: When true, the managed Consumer will not be deleted when the resource is deleted type: boolean default: false status: