Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Deleting EventTypes when changing sink to a non-Broker kind (#420)
Browse files Browse the repository at this point in the history
* Adding knative-sources namespace to gcppubsub's ServiceAccount.

* Fixing bug introduced during a refactor.
GitHub webhooks cannot be created otherwise.

* Deleting eventTypes for non-Broker sinks

* more UTs

* fixing test
  • Loading branch information
nachocano authored and knative-prow-robot committed May 13, 2019
1 parent 1510750 commit b6cdf39
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 44 deletions.
14 changes: 6 additions & 8 deletions contrib/awssqs/pkg/reconciler/awssqssource.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -233,6 +230,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.AwsSqsSource) *eve
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
14 changes: 6 additions & 8 deletions contrib/gcppubsub/pkg/reconciler/gcppubsubsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -323,6 +320,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.GcpPubSubSource) *
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
33 changes: 29 additions & 4 deletions contrib/gcppubsub/pkg/reconciler/gcppubsubsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingsourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
v1 "k8s.io/api/apps/v1"
"k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestReconcile(t *testing.T) {
getAddressableWithName(transformerAddressableName),
},
WantPresent: []runtime.Object{
getReadySource(),
getReadyAndMarkEventTypeSource(),
},
}, {
Name: "successful create - reuse existing receive adapter",
Expand All @@ -287,7 +287,7 @@ func TestReconcile(t *testing.T) {
},
},
WantPresent: []runtime.Object{
getReadySource(),
getReadyAndMarkEventTypeSource(),
},
}, {
Name: "successful create event types",
Expand All @@ -300,6 +300,20 @@ func TestReconcile(t *testing.T) {
getReadyAndMarkEventTypeSourceWithKind(brokerKind),
getEventType(),
},
}, {
Name: "successful delete event types",
InitialState: []runtime.Object{
getSource(),
getAddressable(),
getAddressableWithName(transformerAddressableName),
getEventTypeForSource("name-1", getSource()),
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypeSource(),
},
WantAbsent: []runtime.Object{
getEventTypeForSource("name-1", getSource()),
},
}, {
Name: "cannot create event types",
InitialState: []runtime.Object{
Expand Down Expand Up @@ -411,6 +425,10 @@ func getSourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
}

func getEventType() *eventingv1alpha1.EventType {
return getEventTypeForSource("", getSourceWithKind(brokerKind))
}

func getEventTypeForSource(name string, src *sourcesv1alpha1.GcpPubSubSource) *eventingv1alpha1.EventType {
return &eventingv1alpha1.EventType{
TypeMeta: metav1.TypeMeta{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -427,9 +445,10 @@ func getEventType() *eventingv1alpha1.EventType {
UID: sourceUID,
},
},
Name: name,
GenerateName: fmt.Sprintf("%s-", sourcesv1alpha1.GcpPubSubSourceEventType),
Namespace: testNS,
Labels: getLabels(getSourceWithKind(brokerKind)),
Labels: getLabels(src),
},
Spec: eventingv1alpha1.EventTypeSpec{
Type: sourcesv1alpha1.GcpPubSubSourceEventType,
Expand Down Expand Up @@ -508,6 +527,12 @@ func getReadySourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
return src
}

func getReadyAndMarkEventTypeSource() *sourcesv1alpha1.GcpPubSubSource {
src := getReadySource()
src.Status.MarkEventTypes()
return src
}

func getReadyAndMarkEventTypeSourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
src := getReadySourceWithKind(kind)
src.Status.MarkEventTypes()
Expand Down
14 changes: 6 additions & 8 deletions contrib/kafka/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -199,6 +196,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.KafkaSource) *even
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
79 changes: 70 additions & 9 deletions contrib/kafka/pkg/reconciler/kafkasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingsourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
v1 "k8s.io/api/apps/v1"
"k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestReconcile(t *testing.T) {
getAddressable(),
},
WantPresent: []runtime.Object{
getReadySource(),
getReadySourceAndMarkEventTypes(),
},
}, {
Name: "successful create - reuse existing receive adapter",
Expand All @@ -159,7 +159,7 @@ func TestReconcile(t *testing.T) {
},
},
WantPresent: []runtime.Object{
getReadySource(),
getReadySourceAndMarkEventTypes(),
},
}, {
Name: "successful create event types",
Expand All @@ -185,8 +185,59 @@ func TestReconcile(t *testing.T) {
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypesSourceWithKind(brokerKind),
getEventType("name1", "group", "topic1"),
getEventType("name2", "group", "topic2"),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
}, {
Name: "successful create missing event types",
InitialState: []runtime.Object{
getSourceWithKind(brokerKind),
getAddressableWithKind(brokerKind),
getEventType("name2", "topic2"),
getEventType("name3", "whatever_topic"),
},
Mocks: controllertesting.Mocks{
MockCreates: []controllertesting.MockCreate{
func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
if eventType, ok := obj.(*eventingv1alpha1.EventType); ok {
// Hack because the fakeClient does not support GenerateName.
if strings.Contains(eventType.Spec.Source, "topic1") {
eventType.Name = "name1"
}
return controllertesting.Unhandled, nil
}
return controllertesting.Unhandled, nil
},
},
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypesSourceWithKind(brokerKind),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
WantAbsent: []runtime.Object{
getEventType("name3", "whatever_topic"),
},
}, {
Name: "successful delete event type",
InitialState: []runtime.Object{
getSource(),
getAddressable(),
getReceiveAdapter(),
getEventType("name1", "topic1"),
},
Mocks: controllertesting.Mocks{
MockCreates: []controllertesting.MockCreate{
func(_ client.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("an error that won't be seen because create is not called")
},
},
},
WantPresent: []runtime.Object{
getReadySourceAndMarkEventTypes(),
},
WantAbsent: []runtime.Object{
getEventType("name1", "topic1"),
},
}, {
Name: "cannot create event types",
Expand All @@ -205,8 +256,8 @@ func TestReconcile(t *testing.T) {
},
},
WantAbsent: []runtime.Object{
getEventType("name1", "group", "topic1"),
getEventType("name2", "group", "topic2"),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
WantPresent: []runtime.Object{
getSourceWithSinkAndDeployedAndKind(brokerKind),
Expand Down Expand Up @@ -288,7 +339,7 @@ func getSourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
return obj
}

func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
func getEventTypeForSource(name, topic string, source *sourcesv1alpha1.KafkaSource) *eventingv1alpha1.EventType {
return &eventingv1alpha1.EventType{
TypeMeta: metav1.TypeMeta{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -308,7 +359,7 @@ func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
Name: name,
GenerateName: fmt.Sprintf("%s-", sourcesv1alpha1.KafkaEventType),
Namespace: testNS,
Labels: getLabels(getSourceWithKind(brokerKind)),
Labels: getLabels(source),
},
Spec: eventingv1alpha1.EventTypeSpec{
Type: sourcesv1alpha1.KafkaEventType,
Expand All @@ -318,6 +369,10 @@ func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
}
}

func getEventType(name, topic string) *eventingv1alpha1.EventType {
return getEventTypeForSource(name, topic, getSourceWithKind(brokerKind))
}

func getSourceWithNoSink() *sourcesv1alpha1.KafkaSource {
src := getSource()
src.Status.InitializeConditions()
Expand Down Expand Up @@ -352,6 +407,12 @@ func getReadySourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
return src
}

func getReadySourceAndMarkEventTypes() *sourcesv1alpha1.KafkaSource {
src := getReadySource()
src.Status.MarkEventTypes()
return src
}

func getReadyAndMarkEventTypesSourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
src := getReadySourceWithKind(kind)
src.Status.MarkEventTypes()
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/eventtype/eventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ReconcilerArgs struct {
Specs []eventingv1alpha1.EventTypeSpec
Namespace string
Labels map[string]string
Kind string
}

// Reconcile reconciles the EventTypes taken from 'args', and sets 'owner' as the controller.
Expand Down Expand Up @@ -100,6 +101,14 @@ func (r *Reconciler) getEventTypes(ctx context.Context, namespace string, lbs ma
// makeEventTypes creates the in-memory representation of the EventTypes.
func (r *Reconciler) makeEventTypes(args *ReconcilerArgs, owner metav1.Object) ([]eventingv1alpha1.EventType, error) {
eventTypes := make([]eventingv1alpha1.EventType, 0)

// Only create EventTypes for Broker sinks.
// We add this check here in case the Source was changed from a Broker to non-Broker sink.
// If so, we need to delete the existing EventTypes, thus we return empty expected.
if args.Kind != "Broker" {
return eventTypes, nil
}

for _, spec := range args.Specs {
eventType := resources.MakeEventType(spec, args.Namespace, args.Labels)
// Setting the reference to delete the EventType upon uninstalling the source.
Expand Down
12 changes: 5 additions & 7 deletions pkg/reconciler/githubsource/githubsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,11 @@ func (r *reconciler) reconcile(ctx context.Context, source *sourcesv1alpha1.GitH
return nil
}

// Only create EventTypes for Broker sinks.
if source.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, source)
if err != nil {
return err
}
source.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, source)
if err != nil {
return err
}
source.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -366,6 +363,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(source *sourcesv1alpha1.GitHubSo
Specs: specs,
Namespace: source.Namespace,
Labels: resources.Labels(source.Name),
Kind: source.Spec.Sink.Kind,
}
}

Expand Down
Loading

0 comments on commit b6cdf39

Please sign in to comment.