Skip to content

Commit

Permalink
KNOX-3065: SSE support for Knox
Browse files Browse the repository at this point in the history
  • Loading branch information
hanicz committed Nov 9, 2024
1 parent dceb495 commit 39e5197
Show file tree
Hide file tree
Showing 16 changed files with 1,656 additions and 12 deletions.
18 changes: 18 additions & 0 deletions gateway-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@
<artifactId>gateway-util-urltemplate</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,19 @@ public interface SpiGatewayMessages {

@Message( level = MessageLevel.ERROR, text = "No valid principal found" )
void noPrincipalFound();

@Message( level = MessageLevel.INFO, text = "Every event was read from the SSE stream" )
void sseConnectionDone();

@Message( level = MessageLevel.ERROR, text = "Error during SSE connection: {0}" )
void sseConnectionError(String error);

@Message( level = MessageLevel.ERROR, text = "Error writing into the SSE output stream : {1}" )
void errorWritingOutputStream(@StackTrace(level=MessageLevel.ERROR) Exception e);

@Message( level = MessageLevel.WARN, text = "SSE connection cancelled" )
void sseConnectionCancelled();

@Message( level = MessageLevel.ERROR, text = "Unable to close SSE producer" )
void sseProducerCloseError(@StackTrace(level=MessageLevel.ERROR) Exception e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.dispatch;

import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.metrics.MetricsService;

import javax.net.ssl.SSLContext;
import javax.servlet.FilterConfig;

public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory implements HttpAsyncClientFactory {

@Override
public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) {
final String serviceRole = filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE);
HttpAsyncClientBuilder builder;
GatewayConfig gatewayConfig = (GatewayConfig) filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
GatewayServices services = (GatewayServices) filterConfig.getServletContext()
.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) {
MetricsService metricsService = services.getService(ServiceType.METRICS_SERVICE);
builder = metricsService.getInstrumented(HttpAsyncClientBuilder.class);
} else {
builder = HttpAsyncClients.custom();
}

// Conditionally set a custom SSLContext
SSLContext sslContext = createSSLContext(services, filterConfig, serviceRole);
if (sslContext != null) {
builder.setSSLContext(sslContext);
}

if (Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new DefaultHttpAsyncClientFactory.UseJaasCredentials());

Registry<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new KnoxSpnegoAuthSchemeFactory(true))
.build();

builder.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new HadoopAuthCookieStore(gatewayConfig))
.setDefaultCredentialsProvider(credentialsProvider);
} else {
builder.setDefaultCookieStore(new DefaultHttpAsyncClientFactory.NoCookieStore());
}

builder.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE);
builder.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE);
builder.setRedirectStrategy(new DefaultHttpAsyncClientFactory.NeverRedirectStrategy());
int maxConnections = getMaxConnections(filterConfig);
builder.setMaxConnTotal(maxConnections);
builder.setMaxConnPerRoute(maxConnections);
builder.setDefaultRequestConfig(getRequestConfig(filterConfig, serviceRole));

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

public class DefaultHttpClientFactory implements HttpClientFactory {
private static final SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class);
private static final String PARAMETER_SERVICE_ROLE = "serviceRole";
protected static final String PARAMETER_SERVICE_ROLE = "serviceRole";
static final String PARAMETER_USE_TWO_WAY_SSL = "useTwoWaySsl";
/* retry in case of NoHttpResponseException */
static final String PARAMETER_RETRY_COUNT = "retryCount";
Expand Down Expand Up @@ -255,7 +255,7 @@ static RequestConfig getRequestConfig(FilterConfig config, String serviceRole) {
return builder.build();
}

private static class NoCookieStore implements CookieStore {
protected static class NoCookieStore implements CookieStore {
@Override
public void addCookie(Cookie cookie) {
//no op
Expand All @@ -277,7 +277,7 @@ public void clear() {
}
}

private static class NeverRedirectStrategy implements RedirectStrategy {
protected static class NeverRedirectStrategy implements RedirectStrategy {
@Override
public boolean isRedirected( HttpRequest request, HttpResponse response, HttpContext context )
throws ProtocolException {
Expand All @@ -298,7 +298,7 @@ public boolean retryRequest( IOException exception, int executionCount, HttpCont
}
}

private static class UseJaasCredentials implements Credentials {
protected static class UseJaasCredentials implements Credentials {

@Override
public String getPassword() {
Expand All @@ -312,7 +312,7 @@ public Principal getUserPrincipal() {

}

private int getMaxConnections( FilterConfig filterConfig ) {
protected int getMaxConnections( FilterConfig filterConfig ) {
int maxConnections = 32;
GatewayConfig config =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -330,7 +330,7 @@ private int getMaxConnections( FilterConfig filterConfig ) {
return maxConnections;
}

private static int getConnectionTimeout( FilterConfig filterConfig ) {
protected static int getConnectionTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -348,7 +348,7 @@ private static int getConnectionTimeout( FilterConfig filterConfig ) {
return timeout;
}

private static int getSocketTimeout( FilterConfig filterConfig ) {
protected static int getSocketTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -366,7 +366,7 @@ private static int getSocketTimeout( FilterConfig filterConfig ) {
return timeout;
}

private static long parseTimeout( String s ) {
protected static long parseTimeout( String s ) {
PeriodFormatter f = new PeriodFormatterBuilder()
.appendMinutes().appendSuffix("m"," min")
.appendSeconds().appendSuffix("s"," sec")
Expand All @@ -375,7 +375,7 @@ private static long parseTimeout( String s ) {
return p.toStandardDuration().getMillis();
}

private static String getCookieSpec(FilterConfig filterConfig) {
protected static String getCookieSpec(FilterConfig filterConfig) {
String cookieSpec = filterConfig.getInitParameter("httpclient.cookieSpec");
if (StringUtils.isNotBlank(cookieSpec)) {
return cookieSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.Collections;
Expand Down Expand Up @@ -71,13 +72,13 @@ public void init(FilterConfig filterConfig) throws ServletException {
synchronized(lock) {
if (dispatch == null) {
String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
dispatch = newInstanceFromName(dispatchImpl);
dispatch = newInstanceFromName(dispatchImpl, filterConfig);
}
ConfigurationInjectorBuilder.configuration().target(dispatch).source(filterConfig).inject();
HttpClientFactory httpClientFactory;
String httpClientFactoryClass = filterConfig.getInitParameter("httpClientFactory");
if (httpClientFactoryClass != null) {
httpClientFactory = newInstanceFromName(httpClientFactoryClass);
httpClientFactory = newInstanceFromName(httpClientFactoryClass, filterConfig);
} else {
httpClientFactory = new DefaultHttpClientFactory();
}
Expand Down Expand Up @@ -217,15 +218,28 @@ public void doMethod(Dispatch dispatch, HttpServletRequest request, HttpServletR
}
}

private <T> T newInstanceFromName(String dispatchImpl) throws ServletException {
private <T> T newInstanceFromName(String dispatchImpl, FilterConfig filterConfig) throws ServletException {
try {
Class<T> clazz = loadClass(dispatchImpl);
Constructor<?> constructor = this.getConstructorWithType(clazz, FilterConfig.class);
if(constructor != null) {
return (T) constructor.newInstance(filterConfig);
}
return clazz.newInstance();
} catch ( Exception e ) {
throw new ServletException(e);
}
}

private <T> Constructor<?> getConstructorWithType(Class<T> clazz, Class<?> type) {
for (Constructor<?> constructor : clazz.getConstructors()) {
if (constructor.getParameterCount() == 1 && constructor.getParameterTypes()[0] == type) {
return constructor;
}
}
return null;
}

private <T> Class<T> loadClass(String className) throws ClassNotFoundException {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if ( loader == null ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.dispatch;

import org.apache.http.nio.client.HttpAsyncClient;

import javax.servlet.FilterConfig;

public interface HttpAsyncClientFactory extends HttpClientFactory {
HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig);
}
Loading

0 comments on commit 39e5197

Please sign in to comment.