Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KNOX-3065: SSE support for Knox #947

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions gateway-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@
<artifactId>gateway-util-urltemplate</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,19 @@ public interface SpiGatewayMessages {

@Message( level = MessageLevel.ERROR, text = "No valid principal found" )
void noPrincipalFound();

@Message( level = MessageLevel.INFO, text = "Every event was read from the SSE stream" )
void sseConnectionDone();

@Message( level = MessageLevel.ERROR, text = "Error during SSE connection: {0}" )
void sseConnectionError(String error);

@Message( level = MessageLevel.ERROR, text = "Error writing into the SSE output stream : {0}" )
void errorWritingOutputStream(@StackTrace(level=MessageLevel.ERROR) Exception e);

@Message( level = MessageLevel.WARN, text = "SSE connection cancelled" )
void sseConnectionCancelled();

@Message( level = MessageLevel.ERROR, text = "Unable to close SSE producer" )
void sseProducerCloseError(@StackTrace(level=MessageLevel.ERROR) Exception e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.dispatch;

import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.metrics.MetricsService;

import javax.net.ssl.SSLContext;
import javax.servlet.FilterConfig;

public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory implements HttpAsyncClientFactory {

@Override
public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) {
final String serviceRole = filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE);
HttpAsyncClientBuilder builder;
GatewayConfig gatewayConfig = (GatewayConfig) filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
GatewayServices services = (GatewayServices) filterConfig.getServletContext()
.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) {
MetricsService metricsService = services.getService(ServiceType.METRICS_SERVICE);
builder = metricsService.getInstrumented(HttpAsyncClientBuilder.class);
} else {
builder = HttpAsyncClients.custom();
}

// Conditionally set a custom SSLContext
SSLContext sslContext = createSSLContext(services, filterConfig, serviceRole);
if (sslContext != null) {
builder.setSSLContext(sslContext);
}

if (Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new DefaultHttpAsyncClientFactory.UseJaasCredentials());

Registry<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new KnoxSpnegoAuthSchemeFactory(true))
.build();

builder.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new HadoopAuthCookieStore(gatewayConfig))
.setDefaultCredentialsProvider(credentialsProvider);
} else {
builder.setDefaultCookieStore(new DefaultHttpAsyncClientFactory.NoCookieStore());
}

builder.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE);
builder.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE);
builder.setRedirectStrategy(new DefaultHttpAsyncClientFactory.NeverRedirectStrategy());
int maxConnections = getMaxConnections(filterConfig);
builder.setMaxConnTotal(maxConnections);
builder.setMaxConnPerRoute(maxConnections);
builder.setDefaultRequestConfig(getRequestConfig(filterConfig, serviceRole));

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

public class DefaultHttpClientFactory implements HttpClientFactory {
private static final SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class);
private static final String PARAMETER_SERVICE_ROLE = "serviceRole";
protected static final String PARAMETER_SERVICE_ROLE = "serviceRole";
static final String PARAMETER_USE_TWO_WAY_SSL = "useTwoWaySsl";
/* retry in case of NoHttpResponseException */
static final String PARAMETER_RETRY_COUNT = "retryCount";
Expand Down Expand Up @@ -255,7 +255,7 @@ static RequestConfig getRequestConfig(FilterConfig config, String serviceRole) {
return builder.build();
}

private static class NoCookieStore implements CookieStore {
protected static class NoCookieStore implements CookieStore {
@Override
public void addCookie(Cookie cookie) {
//no op
Expand All @@ -277,7 +277,7 @@ public void clear() {
}
}

private static class NeverRedirectStrategy implements RedirectStrategy {
protected static class NeverRedirectStrategy implements RedirectStrategy {
@Override
public boolean isRedirected( HttpRequest request, HttpResponse response, HttpContext context )
throws ProtocolException {
Expand All @@ -298,7 +298,7 @@ public boolean retryRequest( IOException exception, int executionCount, HttpCont
}
}

private static class UseJaasCredentials implements Credentials {
protected static class UseJaasCredentials implements Credentials {

@Override
public String getPassword() {
Expand All @@ -312,7 +312,7 @@ public Principal getUserPrincipal() {

}

private int getMaxConnections( FilterConfig filterConfig ) {
protected int getMaxConnections( FilterConfig filterConfig ) {
int maxConnections = 32;
GatewayConfig config =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -330,7 +330,7 @@ private int getMaxConnections( FilterConfig filterConfig ) {
return maxConnections;
}

private static int getConnectionTimeout( FilterConfig filterConfig ) {
protected static int getConnectionTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -348,7 +348,7 @@ private static int getConnectionTimeout( FilterConfig filterConfig ) {
return timeout;
}

private static int getSocketTimeout( FilterConfig filterConfig ) {
protected static int getSocketTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
Expand All @@ -366,7 +366,7 @@ private static int getSocketTimeout( FilterConfig filterConfig ) {
return timeout;
}

private static long parseTimeout( String s ) {
protected static long parseTimeout( String s ) {
PeriodFormatter f = new PeriodFormatterBuilder()
.appendMinutes().appendSuffix("m"," min")
.appendSeconds().appendSuffix("s"," sec")
Expand All @@ -375,7 +375,7 @@ private static long parseTimeout( String s ) {
return p.toStandardDuration().getMillis();
}

private static String getCookieSpec(FilterConfig filterConfig) {
protected static String getCookieSpec(FilterConfig filterConfig) {
String cookieSpec = filterConfig.getInitParameter("httpclient.cookieSpec");
if (StringUtils.isNotBlank(cookieSpec)) {
return cookieSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.Collections;
Expand Down Expand Up @@ -71,13 +72,13 @@ public void init(FilterConfig filterConfig) throws ServletException {
synchronized(lock) {
if (dispatch == null) {
String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
dispatch = newInstanceFromName(dispatchImpl);
dispatch = newInstanceFromName(dispatchImpl, filterConfig);
}
ConfigurationInjectorBuilder.configuration().target(dispatch).source(filterConfig).inject();
HttpClientFactory httpClientFactory;
String httpClientFactoryClass = filterConfig.getInitParameter("httpClientFactory");
if (httpClientFactoryClass != null) {
httpClientFactory = newInstanceFromName(httpClientFactoryClass);
httpClientFactory = newInstanceFromName(httpClientFactoryClass, filterConfig);
} else {
httpClientFactory = new DefaultHttpClientFactory();
}
Expand Down Expand Up @@ -217,15 +218,28 @@ public void doMethod(Dispatch dispatch, HttpServletRequest request, HttpServletR
}
}

private <T> T newInstanceFromName(String dispatchImpl) throws ServletException {
private <T> T newInstanceFromName(String dispatchImpl, FilterConfig filterConfig) throws ServletException {
try {
Class<T> clazz = loadClass(dispatchImpl);
Constructor<?> constructor = this.getConstructorWithType(clazz, FilterConfig.class);
if(constructor != null) {
return (T) constructor.newInstance(filterConfig);
}
return clazz.newInstance();
} catch ( Exception e ) {
throw new ServletException(e);
}
}

private <T> Constructor<?> getConstructorWithType(Class<T> clazz, Class<?> type) {
for (Constructor<?> constructor : clazz.getConstructors()) {
if (constructor.getParameterCount() == 1 && constructor.getParameterTypes()[0] == type) {
return constructor;
}
}
return null;
}

private <T> Class<T> loadClass(String className) throws ClassNotFoundException {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if ( loader == null ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.dispatch;

import org.apache.http.nio.client.HttpAsyncClient;

import javax.servlet.FilterConfig;

public interface HttpAsyncClientFactory extends HttpClientFactory {
HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig);
}
Loading