From fc37e5d1faf4663f5569d6ce4e7f11cae6ae02a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 22 Aug 2024 10:42:26 +0200 Subject: [PATCH] Update TokenVerifier to verify authorization too (#4061) * tiny fix in OIDCDiscoveryConfig logger * Remove unneeded verify method of TokenVerifier * Move TokenVerifier to receiver package and TokenProvider to dispatcher package * Add classes for TokenMatchers (Exact and Prefix) * Add wrapper for EventPolicy * Add EventPolicies to IngressProducer * Add authorization check to TokenVerifier * run hack/update-codegen.sh --- .../dispatcher/impl/auth}/TokenProvider.java | 2 +- .../impl/http/WebClientCloudEventSender.java | 2 +- .../broker/receiver/IngressProducer.java | 7 ++ .../IngressProducerReconcilableStore.java | 15 +++- .../receiver/impl/ReceiverVerticle.java | 8 +- .../impl/auth/AuthenticationException.java | 27 ++++++ .../impl/auth/AuthorizationException.java | 27 ++++++ .../receiver/impl/auth/EventPolicy.java | 47 +++++++++++ .../receiver/impl/auth/ExactTokenMatcher.java | 46 ++++++++++ .../impl/auth}/OIDCDiscoveryConfig.java | 4 +- .../auth}/OIDCDiscoveryConfigListener.java | 2 +- .../broker/receiver/impl/auth}/OIDCInfo.java | 2 +- .../impl/auth/PrefixTokenMatcher.java | 46 ++++++++++ .../receiver/impl/auth/TokenMatcher.java | 38 +++++++++ .../receiver/impl/auth}/TokenVerifier.java | 8 +- .../impl/auth}/TokenVerifierImpl.java | 45 +++++++--- .../impl/handler/AuthenticationHandler.java | 34 ++++++-- .../kafka/broker/receiver/main/Main.java | 2 +- .../main/ReceiverVerticleFactory.java | 2 +- .../receiver/impl/ReceiverVerticleTest.java | 2 +- .../impl/ReceiverVerticleTracingTest.java | 2 +- .../impl/auth/ExactTokenMatcherTest.java | 61 ++++++++++++++ .../impl/auth/PrefixTokenMatcherTest.java | 62 ++++++++++++++ .../handler/AuthenticationHandlerTest.java | 84 +++++++++++++++---- .../IngressRequestHandlerImplTest.java | 12 +++ .../main/ReceiverVerticleFactoryTest.java | 2 +- .../broker/tests/AbstractDataPlaneTest.java | 2 +- 27 files changed, 534 insertions(+), 57 deletions(-) rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/auth}/TokenProvider.java (98%) create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthenticationException.java create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthorizationException.java create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcher.java rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth}/OIDCDiscoveryConfig.java (98%) rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth}/OIDCDiscoveryConfigListener.java (98%) rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth}/OIDCInfo.java (95%) create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcher.java create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenMatcher.java rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth}/TokenVerifier.java (75%) rename data-plane/{core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc => receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth}/TokenVerifierImpl.java (61%) create mode 100644 data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcherTest.java create mode 100644 data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcherTest.java diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenProvider.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/auth/TokenProvider.java similarity index 98% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenProvider.java rename to data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/auth/TokenProvider.java index 066024f881..a137e65d6a 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenProvider.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/auth/TokenProvider.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.dispatcher.impl.auth; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/http/WebClientCloudEventSender.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/http/WebClientCloudEventSender.java index 25793a68c1..dd719b15e8 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/http/WebClientCloudEventSender.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/http/WebClientCloudEventSender.java @@ -20,10 +20,10 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.NamespacedName; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.TokenProvider; import dev.knative.eventing.kafka.broker.core.tracing.TracingSpan; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.impl.ResponseFailureException; +import dev.knative.eventing.kafka.broker.dispatcher.impl.auth.TokenProvider; import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleContext; import io.cloudevents.CloudEvent; import io.cloudevents.http.vertx.VertxMessageFactory; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java index 2a89850119..1e026e29e0 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java @@ -18,9 +18,11 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.client.informers.cache.Lister; import io.vertx.core.Future; +import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -69,4 +71,9 @@ default boolean isEventTypeAutocreateEnabled() { * @return the OIDC audience for the ingress. */ String getAudience(); + + /** + * @return the applying EventPolicies for the ingress. + */ + List getEventPolicies(); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java index c107f2b7aa..ab0ee63f36 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java @@ -27,12 +27,14 @@ import dev.knative.eventing.kafka.broker.core.security.KafkaClientsAuth; import dev.knative.eventing.kafka.broker.core.utils.ReferenceCounter; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; import io.cloudevents.CloudEvent; import io.cloudevents.core.message.Encoding; import io.cloudevents.jackson.JsonFormat; import io.cloudevents.kafka.CloudEventSerializer; import io.fabric8.kubernetes.client.informers.cache.Lister; import io.vertx.core.Future; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -167,7 +169,8 @@ private Future onNewIngress( producerProps, ingress.getEnableAutoCreateEventTypes(), this.eventTypeListerFactory.getForNamespace( - resource.getReference().getNamespace())); + resource.getReference().getNamespace()), + EventPolicy.fromContract(ingress.getEventPoliciesList())); if (isRootPath(ingress.getPath()) && Strings.isNullOrEmpty(ingress.getHost())) { throw new IllegalArgumentException( @@ -276,6 +279,7 @@ private static class IngressProducerImpl implements IngressProducer { private final Properties producerProperties; private final DataPlaneContract.Reference reference; private final String audience; + private final List eventPolicies; private final boolean eventTypeAutocreateEnabled; private final Lister eventTypeLister; @@ -287,7 +291,8 @@ private static class IngressProducerImpl implements IngressProducer { final String host, final Properties producerProperties, final boolean eventTypeAutocreateEnabled, - Lister eventTypeLister) { + Lister eventTypeLister, + final List eventPolicies) { this.producer = producer; this.topic = resource.getTopics(0); this.reference = resource.getReference(); @@ -297,6 +302,7 @@ private static class IngressProducerImpl implements IngressProducer { this.producerProperties = producerProperties; this.eventTypeAutocreateEnabled = eventTypeAutocreateEnabled; this.eventTypeLister = eventTypeLister; + this.eventPolicies = eventPolicies; } @Override @@ -314,6 +320,11 @@ public String getAudience() { return audience; } + @Override + public List getEventPolicies() { + return eventPolicies; + } + @Override public DataPlaneContract.Reference getReference() { return reference; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index f8e268dbb5..92b3671515 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -22,15 +22,15 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthenticationException.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthenticationException.java new file mode 100644 index 0000000000..f4df8f7736 --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthenticationException.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +public class AuthenticationException extends Exception { + + public AuthenticationException(String message) { + super(message); + } + + public AuthenticationException(Exception e) { + super(e); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthorizationException.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthorizationException.java new file mode 100644 index 0000000000..a18f9c6822 --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthorizationException.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +public class AuthorizationException extends Exception { + + public AuthorizationException(String message) { + super(message); + } + + public AuthorizationException(Exception e) { + super(e); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java new file mode 100644 index 0000000000..aa5eba380f --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java @@ -0,0 +1,47 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class EventPolicy { + private final List tokenMatchers; + + public static EventPolicy fromContract(DataPlaneContract.EventPolicy contractEventPolicy) { + return new EventPolicy(TokenMatcher.fromContract(contractEventPolicy.getTokenMatchersList())); + } + + public static List fromContract(List contractEventPolicies) { + return contractEventPolicies.stream().map(EventPolicy::fromContract).collect(Collectors.toList()); + } + + public EventPolicy(List tokenMatchers) { + this.tokenMatchers = tokenMatchers; + } + + public boolean isAuthorized(Map> claims) { + for (TokenMatcher matcher : tokenMatchers) { + if (matcher.match(claims)) { + return true; + } + } + + return false; + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcher.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcher.java new file mode 100644 index 0000000000..5abec61f3e --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcher.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import java.util.List; +import java.util.Map; + +class ExactTokenMatcher implements TokenMatcher { + + private final Map requiredMatches; + + public static ExactTokenMatcher fromContract(DataPlaneContract.Exact exactTokenMatcher) { + return new ExactTokenMatcher(exactTokenMatcher.getAttributesMap()); + } + + public ExactTokenMatcher(Map requiredMatches) { + this.requiredMatches = requiredMatches; + } + + @Override + public boolean match(Map> claims) { + for (var requiredMatch : requiredMatches.entrySet()) { + if (!claims.containsKey(requiredMatch.getKey()) + || !claims.get(requiredMatch.getKey()).contains(requiredMatch.getValue())) { + // as soon as one of the required claims does not match, the matcher should fail + return false; + } + } + + return true; + } +} diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfig.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfig.java similarity index 98% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfig.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfig.java index 4e5d18c769..39a0330811 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfig.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfig.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.receiver.impl.auth; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,7 +32,7 @@ public class OIDCDiscoveryConfig { - private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class); + private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfig.class); private static final String OIDC_DISCOVERY_URL = "https://kubernetes.default.svc/.well-known/openid-configuration"; diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java similarity index 98% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java index 7f0bb717df..017e6d35ee 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.receiver.impl.auth; import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCInfo.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCInfo.java similarity index 95% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCInfo.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCInfo.java index d70efc9af0..aafaf54721 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCInfo.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCInfo.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.receiver.impl.auth; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcher.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcher.java new file mode 100644 index 0000000000..ee72e224e6 --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcher.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import java.util.List; +import java.util.Map; + +class PrefixTokenMatcher implements TokenMatcher { + private final Map requiredMatches; + + public static PrefixTokenMatcher fromContract(DataPlaneContract.Prefix prefixTokenMatcher) { + return new PrefixTokenMatcher(prefixTokenMatcher.getAttributesMap()); + } + + public PrefixTokenMatcher(Map requiredMatches) { + this.requiredMatches = requiredMatches; + } + + @Override + public boolean match(Map> claims) { + for (Map.Entry requiredMatch : requiredMatches.entrySet()) { + if (!claims.containsKey(requiredMatch.getKey()) + || !claims.get(requiredMatch.getKey()).stream() + .anyMatch(s -> s.startsWith(requiredMatch.getValue()))) { + // as soon as one of the required claims does not match, the matcher should fail + return false; + } + } + + return true; + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenMatcher.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenMatcher.java new file mode 100644 index 0000000000..0eb398f9da --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenMatcher.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public interface TokenMatcher { + boolean match(Map> claims); + + static List fromContract(List contractTokenMatchers) { + List matchers = new ArrayList<>(contractTokenMatchers.size()); + + for (var contractTokenMatcher : contractTokenMatchers) { + switch (contractTokenMatcher.getMatcherCase()) { + case EXACT -> matchers.add(ExactTokenMatcher.fromContract(contractTokenMatcher.getExact())); + case PREFIX -> matchers.add(PrefixTokenMatcher.fromContract(contractTokenMatcher.getPrefix())); + } + } + + return matchers; + } +} diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java similarity index 75% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java index 521b4f4514..3ffb986c24 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.receiver.impl.auth; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; -import org.jose4j.jwt.JwtClaims; public interface TokenVerifier { - Future verify(String token, String expectedAudience); - - Future verify(HttpServerRequest request, String expectedAudience); + Future verify(HttpServerRequest request, IngressProducer ingressInfo); } diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java similarity index 61% rename from data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java index b5312e0bbb..164d0e69cf 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java @@ -13,17 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.core.oidc; +package dev.knative.eventing.kafka.broker.receiver.impl.auth; import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerRequest; +import java.util.Map; +import java.util.stream.Collectors; import org.jose4j.jwt.JwtClaims; -import org.jose4j.jwt.consumer.InvalidJwtException; -import org.jose4j.jwt.consumer.JwtConsumer; -import org.jose4j.jwt.consumer.JwtConsumerBuilder; -import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.jwt.consumer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { this.oidcDiscoveryConfig = oidcDiscoveryConfig; } - public Future verify(String token, String expectedAudience) { + private Future verifyAuthN(String token, IngressProducer ingressInfo) { return this.vertx.executeBlocking( promise -> { // execute blocking, as jose .process() is blocking @@ -54,7 +54,7 @@ public Future verify(String token, String expectedAudience) { JwtConsumer jwtConsumer = new JwtConsumerBuilder() .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) - .setExpectedAudience(expectedAudience) + .setExpectedAudience(ingressInfo.getAudience()) .setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer()) .build(); @@ -63,25 +63,46 @@ public Future verify(String token, String expectedAudience) { promise.complete(jwtContext.getJwtClaims()); } catch (InvalidJwtException e) { - promise.fail(e); + promise.fail(new AuthenticationException(e)); } }, false); } - public Future verify(final HttpServerRequest request, String expectedAudience) { + private Future verifyAuthN(final HttpServerRequest request, IngressProducer ingressInfo) { String authHeader = request.getHeader("Authorization"); if (authHeader == null || authHeader.isEmpty()) { - return Future.failedFuture("Request didn't contain Authorization header"); + return Future.failedFuture(new AuthenticationException("Request didn't contain Authorization header")); } if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { - return Future.failedFuture("Authorization header didn't contain Bearer token"); + return Future.failedFuture(new AuthenticationException("Authorization header didn't contain Bearer token")); } String token = authHeader.substring("Bearer ".length()); request.pause(); - return verify(token, expectedAudience).onSuccess(v -> request.resume()); + return verifyAuthN(token, ingressInfo).onSuccess(v -> request.resume()); + } + + private Future verifyAuthZ(JwtClaims claims, IngressProducer ingressInfo) { + // claims from Map> to Map> + var convertedClaims = claims.flattenClaims().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + v -> v.getValue().stream().map(Object::toString).toList())); + + for (EventPolicy ep : ingressInfo.getEventPolicies()) { + if (ep.isAuthorized(convertedClaims)) { + // as soon as one policy allows it, we're good + return Future.succeededFuture(); + } + } + + return Future.failedFuture(new AuthorizationException("Not authorized by any EventPolicy")); + } + + public Future verify(final HttpServerRequest request, IngressProducer ingressInfo) { + return verifyAuthN(request, ingressInfo).compose(jwtClaims -> verifyAuthZ(jwtClaims, ingressInfo)); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java index a1283ac275..6e07e502f8 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java @@ -17,8 +17,10 @@ import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthenticationException; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthorizationException; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Handler; import io.vertx.core.http.HttpServerRequest; @@ -46,15 +48,31 @@ public void handle( } tokenVerifier - .verify(request, ingressInfo.getAudience()) + .verify(request, ingressInfo) .onFailure(e -> { - logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); - request.response() - .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) - .end(); + if (e instanceof AuthenticationException) { + logger.debug( + "Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); + request.response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + } else if (e instanceof AuthorizationException) { + logger.debug( + "Failed to verify authorization of request: {}", keyValue("error", e.getMessage())); + request.response() + .setStatusCode(HttpResponseStatus.FORBIDDEN.code()) + .end(); + } else { + logger.debug( + "Got unexpected exception on verifying auth of request: {}", + keyValue("error", e.getMessage())); + request.response() + .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) + .end(); + } }) - .onSuccess(jwtClaims -> { - logger.debug("Request contained valid JWT. Continuing..."); + .onSuccess(v -> { + logger.debug("Request was authenticated and authorized. Continuing..."); next.handle(request); }); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index f39c499385..40b363b463 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -24,11 +24,11 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; import dev.knative.eventing.kafka.broker.core.utils.Shutdown; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import io.cloudevents.kafka.CloudEventSerializer; import io.cloudevents.kafka.PartitionKeyExtensionInterceptor; import io.fabric8.kubernetes.client.KubernetesClientBuilder; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index be195728cf..aefe5d41a3 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -19,12 +19,12 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeCreatorImpl; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; import dev.knative.eventing.kafka.broker.receiver.impl.ReceiverVerticle; import dev.knative.eventing.kafka.broker.receiver.impl.StrictRequestToRecordMapper; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.api.model.KubernetesResourceList; diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index e74e2f4962..9589827f1b 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -36,11 +36,11 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; import dev.knative.eventing.kafka.broker.receiver.MockReactiveKafkaProducer; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index b191d3392e..f550dcf557 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -24,9 +24,9 @@ import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; import io.cloudevents.CloudEvent; diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcherTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcherTest.java new file mode 100644 index 0000000000..23ffd343f9 --- /dev/null +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/ExactTokenMatcherTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class ExactTokenMatcherTest { + @Test + void match() { + ExactTokenMatcher matcher = new ExactTokenMatcher(Map.of("key", "val")); + + assertTrue(matcher.match(Map.of("key", List.of("val")))); + } + + @Test + void doNotMatchWithWrongValue() { + ExactTokenMatcher matcher = new ExactTokenMatcher(Map.of("key", "val")); + + assertFalse(matcher.match(Map.of("key", List.of("value")))); + } + + @Test + void matchMultiple() { + ExactTokenMatcher matcher = new ExactTokenMatcher(Map.of("key1", "val1", "key2", "val2")); + + assertTrue(matcher.match(Map.of("key1", List.of("val1"), "key2", List.of("val2")))); + assertFalse(matcher.match(Map.of("key1", List.of("val2"), "key2", List.of("val1")))); + } + + @Test + void matchAnyOfSameKey() { + ExactTokenMatcher matcher = new ExactTokenMatcher(Map.of("key1", "val1")); + + assertTrue(matcher.match(Map.of("key1", List.of("val1", "val2")))); + } + + @Test + void matchFailAsSoonAsAnyOfRequiredClaimsIsNotMatching() { + ExactTokenMatcher matcher = new ExactTokenMatcher(Map.of("key1", "val1", "key2", "val2")); + + assertFalse(matcher.match(Map.of("key1", List.of("val1")))); + assertFalse(matcher.match(Map.of("key1", List.of("val1"), "key2", List.of("foobar")))); + } +} diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcherTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcherTest.java new file mode 100644 index 0000000000..6b0dc89756 --- /dev/null +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/PrefixTokenMatcherTest.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.auth; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class PrefixTokenMatcherTest { + + @Test + void match() { + PrefixTokenMatcher matcher = new PrefixTokenMatcher(Map.of("key", "prefix")); + + assertTrue(matcher.match(Map.of("key", List.of("prefix-foo")))); + } + + @Test + void doNotMatchWithWrongPrefix() { + PrefixTokenMatcher matcher = new PrefixTokenMatcher(Map.of("key", "prefix")); + + assertFalse(matcher.match(Map.of("key", List.of("foo-bar")))); + } + + @Test + void matchMultiple() { + PrefixTokenMatcher matcher = new PrefixTokenMatcher(Map.of("key1", "prefix1", "key2", "prefix2")); + + assertTrue(matcher.match(Map.of("key1", List.of("prefix1-foo"), "key2", List.of("prefix2-foo")))); + assertFalse(matcher.match(Map.of("key1", List.of("prefix1-foo"), "key2", List.of("prefix1-foo")))); + } + + @Test + void matchAnyOfSameKey() { + PrefixTokenMatcher matcher = new PrefixTokenMatcher(Map.of("key1", "prefix1")); + + assertTrue(matcher.match(Map.of("key1", List.of("prefix1-foo", "prefix-2")))); + } + + @Test + void matchFailAsSoonAsAnyOfRequiredClaimsIsNotMatching() { + PrefixTokenMatcher matcher = new PrefixTokenMatcher(Map.of("key1", "prefix1", "key2", "prefix2")); + + assertFalse(matcher.match(Map.of("key1", List.of("prefix1-foo")))); + assertFalse(matcher.match(Map.of("key1", List.of("prefix1-foo"), "key2", List.of("foobar")))); + } +} diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java index 1124d56882..84bd88ad06 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java @@ -22,8 +22,11 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthenticationException; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthorizationException; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.client.informers.cache.Lister; import io.netty.handler.codec.http.HttpResponseStatus; @@ -31,7 +34,7 @@ import io.vertx.core.Handler; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import org.jose4j.jwt.JwtClaims; +import java.util.List; import org.junit.jupiter.api.Test; public class AuthenticationHandlerTest { @@ -42,13 +45,61 @@ public void shouldReturnUnauthorizedWhenJWTValidationFails() { TokenVerifier tokenVerifier = new TokenVerifier() { @Override - public Future verify(String token, String expectedAudience) { - return Future.failedFuture("JWT validation failed"); + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + return Future.failedFuture(new AuthenticationException("JWT validation failed")); } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public Lister getEventTypeLister() { + return mock(Lister.class); + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + + @Override + public List getEventPolicies() { + return null; + } + }, + mock(Handler.class)); + + verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); + verify(response, times(1)).end(); + } + + @Test + public void shouldReturnForbiddenWhenAuthorizationFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var response = mockResponse(request, HttpResponseStatus.FORBIDDEN.code()); + + TokenVerifier tokenVerifier = new TokenVerifier() { @Override - public Future verify(HttpServerRequest request, String expectedAudience) { - return Future.failedFuture("JWT validation failed"); + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + return Future.failedFuture(new AuthorizationException("AuthZ failed")); } }; @@ -81,10 +132,15 @@ public Lister getEventTypeLister() { public String getAudience() { return "some-required-audience"; } + + @Override + public List getEventPolicies() { + return null; + } }, mock(Handler.class)); - verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); + verify(response, times(1)).setStatusCode(HttpResponseStatus.FORBIDDEN.code()); verify(response, times(1)).end(); } @@ -95,13 +151,8 @@ public void shouldContinueWithRequestWhenJWTSucceeds() { TokenVerifier tokenVerifier = new TokenVerifier() { @Override - public Future verify(String token, String expectedAudience) { - return Future.succeededFuture(new JwtClaims()); - } - - @Override - public Future verify(HttpServerRequest request, String expectedAudience) { - return Future.succeededFuture(new JwtClaims()); + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + return Future.succeededFuture(); } }; @@ -134,6 +185,11 @@ public Lister getEventTypeLister() { public String getAudience() { return "some-required-audience"; } + + @Override + public List getEventPolicies() { + return null; + } }, next); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index e72f45613e..5289d09ddc 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -30,6 +30,7 @@ import dev.knative.eventing.kafka.broker.receiver.MockReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.receiver.RequestContext; import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.client.informers.cache.Lister; import io.micrometer.prometheus.PrometheusConfig; @@ -41,6 +42,7 @@ import io.vertx.core.http.impl.headers.HeadersMultiMap; import io.vertx.micrometer.MicrometerMetricsOptions; import io.vertx.micrometer.backends.BackendRegistries; +import java.util.List; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -111,6 +113,11 @@ public Lister getEventTypeLister() { public String getAudience() { return ""; } + + @Override + public List getEventPolicies() { + return null; + } }); verifySetStatusCodeAndTerminateResponse(statusCode, response); @@ -153,6 +160,11 @@ public Lister getEventTypeLister() { public String getAudience() { return ""; } + + @Override + public List getEventPolicies() { + return null; + } }); verifySetStatusCodeAndTerminateResponse(IngressRequestHandlerImpl.MAPPER_FAILED, response); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index fa28453b14..c88a31a65a 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -22,8 +22,8 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import io.fabric8.kubernetes.client.KubernetesClient; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index d18e042724..8e8fb44a46 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -33,7 +33,6 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; @@ -45,6 +44,7 @@ import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; import dev.knative.eventing.kafka.broker.receiver.impl.ReceiverVerticle; import dev.knative.eventing.kafka.broker.receiver.impl.StrictRequestToRecordMapper; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; import io.cloudevents.core.builder.CloudEventBuilder;