Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bearer token support for Kafka broker to authenticate to use JWKS / Introspection endpoints #217

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Properties;
import java.util.Set;

import static io.strimzi.kafka.oauth.common.Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS;
import static io.strimzi.kafka.oauth.common.ConfigUtil.getConnectTimeout;
import static io.strimzi.kafka.oauth.common.ConfigUtil.getReadTimeout;
import static io.strimzi.kafka.oauth.common.DeprecationUtil.isAccessTokenJwt;
Expand Down Expand Up @@ -246,14 +245,14 @@ private int getHttpRetries(ClientConfig config) {
}

private long getHttpRetryPauseMillis(ClientConfig config, int retries) {
long retryPauseMillis = config.getValueAsLong(OAUTH_HTTP_RETRY_PAUSE_MILLIS, 0);
long retryPauseMillis = config.getValueAsLong(Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS, 0);
if (retries > 0) {
if (retryPauseMillis < 0) {
retryPauseMillis = 0;
LOG.warn("The configured value of '{}' is less than zero and will be ignored", OAUTH_HTTP_RETRY_PAUSE_MILLIS);
LOG.warn("The configured value of '{}' is less than zero and will be ignored", Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS);
}
if (retryPauseMillis <= 0) {
LOG.warn("No pause between http retries configured. Consider setting '{}' to greater than zero to avoid flooding the authorization server with requests.", OAUTH_HTTP_RETRY_PAUSE_MILLIS);
LOG.warn("No pause between http retries configured. Consider setting '{}' to greater than zero to avoid flooding the authorization server with requests.", Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS);
}
}
return retryPauseMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public String token() {
throw new IllegalStateException(e);
}
}

@Override
public String toString() {
return "FileBasedTokenProvider: {path: '" + filePath + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class LogUtil {

/**
* Return masked input text.
*
* <p>
* Masking checks the length of input. If less than 8 it returns '**********'.
* If less than 20 it prints out first letter in clear text, and then additional 9x '*' irrespective of actual input size.
* If input length is greater than 20 chars, it prints out first 4 in clear text followed by '***..***' followed by last 4.
*
* <p>
* The idea is to give some information for debugging while not leaking too much information about secrets.
*
* @param input String with sensitive date which should be masked
Expand All @@ -33,9 +33,21 @@ public static String mask(String input) {
}

if (len < 20) {
return "" + input.charAt(0) + "*********";
return input.charAt(0) + "*********";
}

return input.substring(0, 4) + "**" + input.substring(len - 4, len);
}

/**
* Wrap the value in single quotes, or return null if value is null
*
* @param value The value to wrap in single quotes
* @return The quoted value
*/
public static String singleQuote(String value) {
if (value == null)
return null;
return "'" + value + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.strimzi.kafka.oauth.common;

import static io.strimzi.kafka.oauth.common.LogUtil.mask;

/**
* A TokenProvider that contains an immutable token that is returned every time a {@link io.strimzi.kafka.oauth.common.StaticTokenProvider#token()} method is called.
*/
Expand All @@ -24,4 +26,9 @@ public StaticTokenProvider(final String token) {
public String token() {
return token;
}

@Override
public String toString() {
return "StaticTokenProvider: {token: '" + mask(token) + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(configId);
}

@Override
public String toString() {
return "ConfigurationKey {configId: " + configId + ", validatorKey: " + validatorKey + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

import java.util.Objects;

import static io.strimzi.kafka.oauth.common.LogUtil.mask;
import static io.strimzi.kafka.oauth.common.LogUtil.singleQuote;

/**
* The class that holds the validator configuration and is used to compare different configurations for equality.
* It also calculates a unique identifier based on the configuration that is stable across application restarts.
*
* <p>
* This mechanism allows sharing a single validator across multiple listeners, as long as they are configured with same config parameter values
*/
public class ValidatorKey {

private final String clientId;
private final String clientSecret;
private final String bearerToken;
private final String validIssuerUri;
private final String audience;
private final String customClaimCheck;
Expand All @@ -39,7 +45,10 @@ public class ValidatorKey {
private final String configIdHash;

@SuppressWarnings("checkstyle:ParameterNumber")
ValidatorKey(String validIssuerUri,
ValidatorKey(String clientId,
String clientSecret,
String bearerToken,
String validIssuerUri,
String audience,
String customClaimCheck,
String usernameClaim,
Expand All @@ -57,6 +66,9 @@ public class ValidatorKey {
boolean enableMetrics,
boolean includeAcceptHeader) {

this.clientId = clientId;
this.clientSecret = clientSecret;
this.bearerToken = bearerToken;
this.validIssuerUri = validIssuerUri;
this.audience = audience;
this.customClaimCheck = customClaimCheck;
Expand All @@ -75,7 +87,11 @@ public class ValidatorKey {
this.enableMetrics = enableMetrics;
this.includeAcceptHeader = includeAcceptHeader;

this.configIdHash = IOUtil.hashForObjects(validIssuerUri,
this.configIdHash = IOUtil.hashForObjects(
clientId,
clientSecret,
bearerToken,
validIssuerUri,
audience,
customClaimCheck,
usernameClaim,
Expand All @@ -94,12 +110,16 @@ public class ValidatorKey {
includeAcceptHeader);
}

@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ValidatorKey)) return false;
ValidatorKey that = (ValidatorKey) o;
return hasHostnameVerifier == that.hasHostnameVerifier &&
return Objects.equals(clientId, that.clientId) &&
Objects.equals(clientSecret, that.clientSecret) &&
Objects.equals(bearerToken, that.bearerToken) &&
hasHostnameVerifier == that.hasHostnameVerifier &&
Objects.equals(validIssuerUri, that.validIssuerUri) &&
Objects.equals(audience, that.audience) &&
Objects.equals(customClaimCheck, that.customClaimCheck) &&
Expand All @@ -120,7 +140,11 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(validIssuerUri,
return Objects.hash(
clientId,
clientSecret,
bearerToken,
validIssuerUri,
audience,
customClaimCheck,
usernameClaim,
Expand Down Expand Up @@ -166,6 +190,9 @@ public static class JwtValidatorKey extends ValidatorKey {
/**
* Create a new instance. Arguments have to include all validator config options.
*
* @param clientId clientId
* @param clientSecret clientSecret
* @param bearerToken bearerToken
* @param validIssuerUri validIssuerUri
* @param audience audience
* @param customClaimCheck customClaimCheck
Expand All @@ -192,7 +219,10 @@ public static class JwtValidatorKey extends ValidatorKey {
* @param includeAcceptHeader includeAcceptHeader
*/
@SuppressWarnings("checkstyle:parameternumber")
public JwtValidatorKey(String validIssuerUri,
public JwtValidatorKey(String clientId,
String clientSecret,
String bearerToken,
String validIssuerUri,
String audience,
String customClaimCheck,
String usernameClaim,
Expand All @@ -218,7 +248,10 @@ public JwtValidatorKey(String validIssuerUri,
boolean failFast,
boolean includeAcceptHeader) {

super(validIssuerUri,
super(clientId,
clientSecret,
bearerToken,
validIssuerUri,
audience,
customClaimCheck,
usernameClaim,
Expand Down Expand Up @@ -284,6 +317,38 @@ public int hashCode() {
public String getConfigIdHash() {
return configIdHash;
}

@Override
public String toString() {
return "JwtValidatorKey {clientId: " + singleQuote(super.clientId)
+ ", clientSecret: " + singleQuote(mask(super.clientSecret))
+ ", bearerToken: " + singleQuote(mask(super.bearerToken))
+ ", validIssuerUri: " + singleQuote(super.validIssuerUri)
+ ", audience: " + singleQuote(super.audience)
+ ", customClaimCheck: " + singleQuote(super.customClaimCheck)
+ ", usernameClaim: " + singleQuote(super.usernameClaim)
+ ", fallbackUsernameClaim: " + singleQuote(super.fallbackUsernameClaim)
+ ", fallbackUsernamePrefix: " + singleQuote(super.fallbackUsernamePrefix)
+ ", groupQuery: " + singleQuote(super.groupQuery)
+ ", groupDelimiter: [" + super.groupDelimiter
+ "], sslTruststore: " + singleQuote(super.sslTruststore)
+ ", sslStorePassword: " + singleQuote(mask(super.sslStorePassword))
+ ", sslStoreType: " + singleQuote(super.sslStoreType)
+ ", sslRandom: " + singleQuote(super.sslRandom)
+ ", hasHostnameVerifier: " + super.hasHostnameVerifier
+ ", connectTimeout: " + super.connectTimeout
+ ", readTimeout: " + super.readTimeout
+ ", enableMetrics: " + super.enableMetrics
+ ", includeAcceptHeader: " + super.includeAcceptHeader
+ ", jwksEndpointUri: " + singleQuote(jwksEndpointUri)
+ ", jwksRefreshSeconds: " + jwksRefreshSeconds
+ ", jwksExpirySeconds: " + jwksExpirySeconds
+ ", jwksRefreshMinPauseSeconds: " + jwksRefreshMinPauseSeconds
+ ", jwksIgnoreKeyUse: " + jwksIgnoreKeyUse
+ ", checkAccessTokenType: " + checkAccessTokenType
+ ", failFast: " + failFast
+ "}";
}
}

/**
Expand All @@ -294,15 +359,16 @@ public static class IntrospectionValidatorKey extends ValidatorKey {
private final String introspectionEndpoint;
private final String userInfoEndpoint;
private final String validTokenType;
private final String clientId;
private final String clientSecret;
private final int retries;
private final long retryPauseMillis;
private final String configIdHash;

/**
* Create a new instance. Arguments have to include all validator config options.
*
* @param clientId clientId
* @param clientSecret clientSecret
* @param bearerToken bearerToken
* @param validIssuerUri validIssuerUri
* @param audience audience
* @param customClaimCheck customClaimCheck
Expand All @@ -319,8 +385,6 @@ public static class IntrospectionValidatorKey extends ValidatorKey {
* @param introspectionEndpoint introspectionEndpoint
* @param userInfoEndpoint userInfoEndpoint
* @param validTokenType validTokenType
* @param clientId clientId
* @param clientSecret clientSecret
* @param connectTimeout connectTimeout
* @param readTimeout readTimeout
* @param enableMetrics enableMetrics
Expand All @@ -329,7 +393,10 @@ public static class IntrospectionValidatorKey extends ValidatorKey {
* @param includeAcceptHeader includeAcceptHeader
*/
@SuppressWarnings("checkstyle:parameternumber")
public IntrospectionValidatorKey(String validIssuerUri,
public IntrospectionValidatorKey(String clientId,
String clientSecret,
String bearerToken,
String validIssuerUri,
String audience,
String customClaimCheck,
String usernameClaim,
Expand All @@ -346,16 +413,17 @@ public IntrospectionValidatorKey(String validIssuerUri,
String introspectionEndpoint,
String userInfoEndpoint,
String validTokenType,
String clientId,
String clientSecret,
int connectTimeout,
int readTimeout,
boolean enableMetrics,
int retries,
long retryPauseMillis,
boolean includeAcceptHeader) {

super(validIssuerUri,
super(clientId,
clientSecret,
bearerToken,
validIssuerUri,
audience,
customClaimCheck,
usernameClaim,
Expand All @@ -375,8 +443,6 @@ public IntrospectionValidatorKey(String validIssuerUri,
this.introspectionEndpoint = introspectionEndpoint;
this.userInfoEndpoint = userInfoEndpoint;
this.validTokenType = validTokenType;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.retries = retries;
this.retryPauseMillis = retryPauseMillis;

Expand All @@ -399,8 +465,6 @@ public boolean equals(Object o) {
return Objects.equals(introspectionEndpoint, that.introspectionEndpoint) &&
Objects.equals(userInfoEndpoint, that.userInfoEndpoint) &&
Objects.equals(validTokenType, that.validTokenType) &&
Objects.equals(clientId, that.clientId) &&
Objects.equals(clientSecret, that.clientSecret) &&
Objects.equals(retries, that.retries) &&
Objects.equals(retryPauseMillis, that.retryPauseMillis);
}
Expand All @@ -411,8 +475,6 @@ public int hashCode() {
introspectionEndpoint,
userInfoEndpoint,
validTokenType,
clientId,
clientSecret,
retries,
retryPauseMillis);
}
Expand All @@ -421,5 +483,35 @@ public int hashCode() {
public String getConfigIdHash() {
return configIdHash;
}

@Override
public String toString() {
return "IntrospectionValidatorKey {clientId: " + singleQuote(super.clientId)
+ ", clientSecret: " + singleQuote(mask(super.clientSecret))
+ ", bearerToken: " + singleQuote(mask(super.bearerToken))
+ ", validIssuerUri: " + singleQuote(super.validIssuerUri)
+ ", audience: " + singleQuote(super.audience)
+ ", customClaimCheck: " + singleQuote(super.customClaimCheck)
+ ", usernameClaim: " + singleQuote(super.usernameClaim)
+ ", fallbackUsernameClaim: " + singleQuote(super.fallbackUsernameClaim)
+ ", fallbackUsernamePrefix: " + singleQuote(super.fallbackUsernamePrefix)
+ ", groupQuery: " + singleQuote(super.groupQuery)
+ ", groupDelimiter: [" + super.groupDelimiter
+ "], sslTruststore: " + singleQuote(super.sslTruststore)
+ ", sslStorePassword: " + singleQuote(mask(super.sslStorePassword))
+ ", sslStoreType: " + singleQuote(super.sslStoreType)
+ ", sslRandom: " + singleQuote(super.sslRandom)
+ ", hasHostnameVerifier: " + super.hasHostnameVerifier
+ ", connectTimeout: " + super.connectTimeout
+ ", readTimeout: " + super.readTimeout
+ ", enableMetrics: " + super.enableMetrics
+ ", includeAcceptHeader: " + super.includeAcceptHeader
+ ", introspectionEndpoint: " + singleQuote(introspectionEndpoint)
+ ", userInfoEndpoint: " + singleQuote(userInfoEndpoint)
+ ", validTokenType: " + singleQuote(validTokenType)
+ ", retries: " + retries
+ ", retryPauseMillis: " + retryPauseMillis
+ "}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public TokenValidator get(ConfigurationKey key, Supplier<TokenValidator> factory
// If key with the same configId exists already it has to have an equal validatorKey (the same configuration)
// In that case, the existing ValidatorEntry will be reused
if (!key.getValidatorKey().equals(previous.key.getValidatorKey())) {
throw new ConfigException("Configuration id " + key.getConfigId() + " with different configuration has already been assigned");
throw new ConfigException("Configuration id '" + key.getConfigId() + "' with different configuration has already been assigned (" + previous.key + "\n\tversus:\n\t" + key + ")");
}
return previous.validator;
}
Expand Down
Loading
Loading