From ec56882125e516bfaa008795604d63a943486eef Mon Sep 17 00:00:00 2001 From: thanicz Date: Tue, 22 Oct 2024 13:18:36 +0200 Subject: [PATCH] KNOX-3065: SSE support for Knox --- gateway-spi/pom.xml | 18 + .../knox/gateway/SpiGatewayMessages.java | 15 + .../DefaultHttpAsyncClientFactory.java | 87 ++++ .../dispatch/DefaultHttpClientFactory.java | 18 +- .../dispatch/GatewayDispatchFilter.java | 20 +- .../dispatch/HttpAsyncClientFactory.java | 26 + .../apache/knox/gateway/sse/SSEDispatch.java | 234 +++++++++ .../apache/knox/gateway/sse/SSEEntity.java | 150 ++++++ .../apache/knox/gateway/sse/SSEException.java | 25 + .../apache/knox/gateway/sse/SSEResponse.java | 158 ++++++ .../org/apache/knox/gateway/sse/SSEvent.java | 101 ++++ .../DefaultHttpAsyncClientFactoryTest.java | 76 +++ .../knox/gateway/sse/SSEDispatchTest.java | 483 ++++++++++++++++++ .../knox/gateway/sse/SSEEntityTest.java | 163 ++++++ .../apache/knox/gateway/sse/SSEventTest.java | 83 +++ pom.xml | 11 + 16 files changed, 1656 insertions(+), 12 deletions(-) create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java create mode 100644 gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java create mode 100644 gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java create mode 100644 gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java create mode 100644 gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java create mode 100644 gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml index b6151b0bf2..d09c39bf4e 100644 --- a/gateway-spi/pom.xml +++ b/gateway-spi/pom.xml @@ -55,6 +55,24 @@ gateway-util-urltemplate + + org.apache.httpcomponents + httpasyncclient + + + org.apache.httpcomponents + httpcore + + + org.apache.httpcomponents + httpcore-nio + + + + + org.apache.httpcomponents + httpcore-nio + javax.servlet javax.servlet-api diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java index 771e616f4c..4440069d8e 100644 --- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java @@ -123,4 +123,19 @@ public interface SpiGatewayMessages { @Message( level = MessageLevel.ERROR, text = "No valid principal found" ) void noPrincipalFound(); + + @Message( level = MessageLevel.INFO, text = "Every event was read from the SSE stream" ) + void sseConnectionDone(); + + @Message( level = MessageLevel.ERROR, text = "Error during SSE connection: {0}" ) + void sseConnectionError(String error); + + @Message( level = MessageLevel.ERROR, text = "Error writing into the SSE output stream : {0}" ) + void errorWritingOutputStream(@StackTrace(level=MessageLevel.ERROR) Exception e); + + @Message( level = MessageLevel.WARN, text = "SSE connection cancelled" ) + void sseConnectionCancelled(); + + @Message( level = MessageLevel.ERROR, text = "Unable to close SSE producer" ) + void sseProducerCloseError(@StackTrace(level=MessageLevel.ERROR) Exception e); } diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java new file mode 100644 index 0000000000..24840233fb --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.dispatch; + +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.GatewayServices; +import org.apache.knox.gateway.services.ServiceType; +import org.apache.knox.gateway.services.metrics.MetricsService; + +import javax.net.ssl.SSLContext; +import javax.servlet.FilterConfig; + +public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory implements HttpAsyncClientFactory { + + @Override + public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) { + final String serviceRole = filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE); + HttpAsyncClientBuilder builder; + GatewayConfig gatewayConfig = (GatewayConfig) filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE); + GatewayServices services = (GatewayServices) filterConfig.getServletContext() + .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE); + if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) { + MetricsService metricsService = services.getService(ServiceType.METRICS_SERVICE); + builder = metricsService.getInstrumented(HttpAsyncClientBuilder.class); + } else { + builder = HttpAsyncClients.custom(); + } + + // Conditionally set a custom SSLContext + SSLContext sslContext = createSSLContext(services, filterConfig, serviceRole); + if (sslContext != null) { + builder.setSSLContext(sslContext); + } + + if (Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new DefaultHttpAsyncClientFactory.UseJaasCredentials()); + + Registry authSchemeRegistry = RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new KnoxSpnegoAuthSchemeFactory(true)) + .build(); + + builder.setDefaultAuthSchemeRegistry(authSchemeRegistry) + .setDefaultCookieStore(new HadoopAuthCookieStore(gatewayConfig)) + .setDefaultCredentialsProvider(credentialsProvider); + } else { + builder.setDefaultCookieStore(new DefaultHttpAsyncClientFactory.NoCookieStore()); + } + + builder.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE); + builder.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE); + builder.setRedirectStrategy(new DefaultHttpAsyncClientFactory.NeverRedirectStrategy()); + int maxConnections = getMaxConnections(filterConfig); + builder.setMaxConnTotal(maxConnections); + builder.setMaxConnPerRoute(maxConnections); + builder.setDefaultRequestConfig(getRequestConfig(filterConfig, serviceRole)); + + return builder.build(); + } +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java index 03a4f891a0..cb658c8fe0 100644 --- a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java @@ -69,7 +69,7 @@ public class DefaultHttpClientFactory implements HttpClientFactory { private static final SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class); - private static final String PARAMETER_SERVICE_ROLE = "serviceRole"; + protected static final String PARAMETER_SERVICE_ROLE = "serviceRole"; static final String PARAMETER_USE_TWO_WAY_SSL = "useTwoWaySsl"; /* retry in case of NoHttpResponseException */ static final String PARAMETER_RETRY_COUNT = "retryCount"; @@ -255,7 +255,7 @@ static RequestConfig getRequestConfig(FilterConfig config, String serviceRole) { return builder.build(); } - private static class NoCookieStore implements CookieStore { + protected static class NoCookieStore implements CookieStore { @Override public void addCookie(Cookie cookie) { //no op @@ -277,7 +277,7 @@ public void clear() { } } - private static class NeverRedirectStrategy implements RedirectStrategy { + protected static class NeverRedirectStrategy implements RedirectStrategy { @Override public boolean isRedirected( HttpRequest request, HttpResponse response, HttpContext context ) throws ProtocolException { @@ -298,7 +298,7 @@ public boolean retryRequest( IOException exception, int executionCount, HttpCont } } - private static class UseJaasCredentials implements Credentials { + protected static class UseJaasCredentials implements Credentials { @Override public String getPassword() { @@ -312,7 +312,7 @@ public Principal getUserPrincipal() { } - private int getMaxConnections( FilterConfig filterConfig ) { + protected int getMaxConnections( FilterConfig filterConfig ) { int maxConnections = 32; GatewayConfig config = (GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE ); @@ -330,7 +330,7 @@ private int getMaxConnections( FilterConfig filterConfig ) { return maxConnections; } - private static int getConnectionTimeout( FilterConfig filterConfig ) { + protected static int getConnectionTimeout( FilterConfig filterConfig ) { int timeout = -1; GatewayConfig globalConfig = (GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE ); @@ -348,7 +348,7 @@ private static int getConnectionTimeout( FilterConfig filterConfig ) { return timeout; } - private static int getSocketTimeout( FilterConfig filterConfig ) { + protected static int getSocketTimeout( FilterConfig filterConfig ) { int timeout = -1; GatewayConfig globalConfig = (GatewayConfig)filterConfig.getServletContext().getAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE ); @@ -366,7 +366,7 @@ private static int getSocketTimeout( FilterConfig filterConfig ) { return timeout; } - private static long parseTimeout( String s ) { + protected static long parseTimeout( String s ) { PeriodFormatter f = new PeriodFormatterBuilder() .appendMinutes().appendSuffix("m"," min") .appendSeconds().appendSuffix("s"," sec") @@ -375,7 +375,7 @@ private static long parseTimeout( String s ) { return p.toStandardDuration().getMillis(); } - private static String getCookieSpec(FilterConfig filterConfig) { + protected static String getCookieSpec(FilterConfig filterConfig) { String cookieSpec = filterConfig.getInitParameter("httpclient.cookieSpec"); if (StringUtils.isNotBlank(cookieSpec)) { return cookieSpec; diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java index 870d3f7883..48196d1379 100644 --- a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java @@ -32,6 +32,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.util.Collections; @@ -71,13 +72,13 @@ public void init(FilterConfig filterConfig) throws ServletException { synchronized(lock) { if (dispatch == null) { String dispatchImpl = filterConfig.getInitParameter("dispatch-impl"); - dispatch = newInstanceFromName(dispatchImpl); + dispatch = newInstanceFromName(dispatchImpl, filterConfig); } ConfigurationInjectorBuilder.configuration().target(dispatch).source(filterConfig).inject(); HttpClientFactory httpClientFactory; String httpClientFactoryClass = filterConfig.getInitParameter("httpClientFactory"); if (httpClientFactoryClass != null) { - httpClientFactory = newInstanceFromName(httpClientFactoryClass); + httpClientFactory = newInstanceFromName(httpClientFactoryClass, filterConfig); } else { httpClientFactory = new DefaultHttpClientFactory(); } @@ -217,15 +218,28 @@ public void doMethod(Dispatch dispatch, HttpServletRequest request, HttpServletR } } - private T newInstanceFromName(String dispatchImpl) throws ServletException { + private T newInstanceFromName(String dispatchImpl, FilterConfig filterConfig) throws ServletException { try { Class clazz = loadClass(dispatchImpl); + Constructor constructor = this.getConstructorWithType(clazz, FilterConfig.class); + if(constructor != null) { + return (T) constructor.newInstance(filterConfig); + } return clazz.newInstance(); } catch ( Exception e ) { throw new ServletException(e); } } + private Constructor getConstructorWithType(Class clazz, Class type) { + for (Constructor constructor : clazz.getConstructors()) { + if (constructor.getParameterCount() == 1 && constructor.getParameterTypes()[0] == type) { + return constructor; + } + } + return null; + } + private Class loadClass(String className) throws ClassNotFoundException { ClassLoader loader = Thread.currentThread().getContextClassLoader(); if ( loader == null ) { diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java new file mode 100644 index 0000000000..c5180ec01a --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.dispatch; + +import org.apache.http.nio.client.HttpAsyncClient; + +import javax.servlet.FilterConfig; + +public interface HttpAsyncClientFactory extends HttpClientFactory { + HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig); +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java new file mode 100644 index 0000000000..318c4bb82f --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.http.nio.client.methods.AsyncCharConsumer; +import org.apache.http.nio.client.methods.HttpAsyncMethods; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; +import org.apache.knox.gateway.audit.api.Action; +import org.apache.knox.gateway.audit.api.ActionOutcome; +import org.apache.knox.gateway.audit.api.ResourceType; +import org.apache.knox.gateway.dispatch.ConfigurableDispatch; +import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory; +import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory; + +import javax.servlet.AsyncContext; +import javax.servlet.FilterConfig; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; + +public class SSEDispatch extends ConfigurableDispatch { + + private final HttpAsyncClient asyncClient; + private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream"; + + public SSEDispatch(FilterConfig filterConfig) { + HttpAsyncClientFactory asyncClientFactory = new DefaultHttpAsyncClientFactory(); + this.asyncClient = asyncClientFactory.createAsyncHttpClient(filterConfig); + + if (asyncClient instanceof CloseableHttpAsyncClient) { + ((CloseableHttpAsyncClient) this.asyncClient).start(); + } + } + + @Override + public void doGet(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpGet httpGetRequest = new HttpGet(url); + this.doHttpMethod(httpGetRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPost(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException, URISyntaxException { + final HttpPost httpPostRequest = new HttpPost(url); + httpPostRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.doHttpMethod(httpPostRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPut(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPut httpPutRequest = new HttpPut(url); + httpPutRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.doHttpMethod(httpPutRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPatch(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPatch httpPatchRequest = new HttpPatch(url); + httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.doHttpMethod(httpPatchRequest, inboundRequest, outboundResponse); + } + + private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + this.addAcceptHeader(httpMethod); + this.copyRequestHeaderFields(httpMethod, inboundRequest); + this.executeRequest(httpMethod, outboundResponse, inboundRequest); + } + + private void executeRequest(HttpUriRequest outboundRequest, HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) { + AsyncContext asyncContext = inboundRequest.startAsync(); + //No timeout + asyncContext.setTimeout(0L); + + HttpAsyncRequestProducer producer = HttpAsyncMethods.create(outboundRequest); + AsyncCharConsumer consumer = new SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext); + this.executeAsyncRequest(producer, consumer, outboundRequest); + } + + private void executeAsyncRequest(HttpAsyncRequestProducer producer, AsyncCharConsumer consumer, HttpUriRequest outboundRequest) { + LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI()); + auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE, RES.requestMethod(outboundRequest.getMethod())); + asyncClient.execute(producer, consumer, new FutureCallback() { + + @Override + public void completed(final SSEResponse response) { + closeProducer(producer); + LOG.sseConnectionDone(); + } + + @Override + public void failed(final Exception ex) { + closeProducer(producer); + LOG.sseConnectionError(ex.getMessage()); + } + + @Override + public void cancelled() { + closeProducer(producer); + LOG.sseConnectionCancelled(); + } + }); + } + + private void addAcceptHeader(HttpUriRequest outboundRequest) { + outboundRequest.setHeader(HttpHeaders.ACCEPT, SSEDispatch.TEXT_EVENT_STREAM_VALUE); + } + + private void handleSuccessResponse(HttpServletResponse outboundResponse, URI url, HttpResponse inboundResponse) { + this.prepareServletResponse(outboundResponse, inboundResponse.getStatusLine().getStatusCode()); + this.copyResponseHeaderFields(outboundResponse, inboundResponse); + auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK)); + } + + private void handleErrorResponse(HttpServletResponse outboundResponse, URI url, HttpResponse httpResponse) { + int statusCode = httpResponse.getStatusLine().getStatusCode(); + outboundResponse.setStatus(statusCode); + LOG.dispatchResponseStatusCode(statusCode); + auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, ActionOutcome.FAILURE, RES.responseStatus(statusCode)); + } + + private void prepareServletResponse(HttpServletResponse outboundResponse, int statusCode) { + LOG.dispatchResponseStatusCode(statusCode); + outboundResponse.setStatus(statusCode); + outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name()); + } + + private boolean isSuccessful(int statusCode) { + return (statusCode >= HttpStatus.SC_OK && statusCode < 300); + } + + private void closeProducer(HttpAsyncRequestProducer producer) { + try { + producer.close(); + } catch (IOException e) { + LOG.sseProducerCloseError(e); + } + } + + private class SSECharConsumer extends AsyncCharConsumer { + private SSEResponse sseResponse; + private final HttpServletResponse outboundResponse; + private final URI url; + private final AsyncContext asyncContext; + + SSECharConsumer(HttpServletResponse outboundResponse, URI url, AsyncContext asyncContext) { + this.outboundResponse = outboundResponse; + this.url = url; + this.asyncContext = asyncContext; + } + + @Override + protected void onResponseReceived(final HttpResponse inboundResponse) { + this.sseResponse = new SSEResponse(inboundResponse); + if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) { + handleSuccessResponse(outboundResponse, url, inboundResponse); + } else { + handleErrorResponse(outboundResponse, url, inboundResponse); + } + } + + @Override + protected void onCharReceived(final CharBuffer buf, final IOControl ioctl) { + try { + if (this.sseResponse.getEntity().readCharBuffer(buf)) { + this.sseResponse.getEntity().sendEvent(this.asyncContext); + } + } catch (InterruptedException | IOException e) { + LOG.errorWritingOutputStream(e); + throw new SSEException(e.getMessage(), e); + } + } + + @Override + protected void releaseResources() { + this.asyncContext.complete(); + } + + @Override + protected SSEResponse buildResult(final HttpContext context) { + return this.sseResponse; + + } + } + + @Override + public void destroy() { + try { + if (this.asyncClient != null && asyncClient instanceof CloseableHttpAsyncClient) { + ((CloseableHttpAsyncClient) asyncClient).close(); + } + } catch (IOException e) { + LOG.errorClosingHttpClient(e); + } + } + + public HttpAsyncClient getAsyncClient() { + return this.asyncClient; + } +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java new file mode 100644 index 0000000000..8b598c631b --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpEntity; +import org.apache.http.entity.AbstractHttpEntity; + +import javax.servlet.AsyncContext; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.CharBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class SSEEntity extends AbstractHttpEntity { + + private static final String SSE_DELIMITER = ":"; + + private final BlockingQueue eventQueue; + private final StringBuilder eventBuilder = new StringBuilder(); + private final HttpEntity httpEntity; + private char previousChar = '0'; + + public SSEEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + this.eventQueue = new LinkedBlockingQueue<>(); + } + + public boolean readCharBuffer(CharBuffer charBuffer) { + while (charBuffer.hasRemaining()) { + processChar(charBuffer.get()); + } + return !eventQueue.isEmpty(); + } + + //Two new line chars (\n\n) after each other means the event is finished streaming + //We can process it and add it to the event queue + private void processChar(char nextChar) { + if (isNewLineChar(nextChar) && isNewLineChar(previousChar)) { + processEvent(); + eventBuilder.setLength(0); + previousChar = '0'; + } else { + eventBuilder.append(nextChar); + previousChar = nextChar; + } + } + + private boolean isNewLineChar(char c) { + return c == '\n' || c == '\r' || c == '\u0085' || c == '\u2028' || c == '\u2029'; + } + + private void processEvent() { + String unprocessedEvent = eventBuilder.toString(); + SSEvent ssEvent = new SSEvent(); + + for (String line : unprocessedEvent.split("\\R")) { + String[] lineTokens = this.parseLine(line); + switch (lineTokens[0]) { + case "id": + ssEvent.setId(lineTokens[1].trim()); + break; + case "event": + ssEvent.setEvent(lineTokens[1].trim()); + break; + case "data": + ssEvent.setData(lineTokens[1].trim()); + break; + case "comment": + ssEvent.setComment(lineTokens[1].trim()); + break; + case "retry": + ssEvent.setRetry(Long.parseLong(lineTokens[1].trim())); + break; + default: + break; + } + } + eventQueue.add(ssEvent); + } + + private String[] parseLine(String line) { + String[] lineTokens = new String[2]; + if(line.startsWith(SSE_DELIMITER)) { + lineTokens[0] = "comment"; + lineTokens[1] = line; + } else { + lineTokens = line.split(SSE_DELIMITER, 2); + } + + return lineTokens; + } + + public void sendEvent(AsyncContext asyncContext) throws IOException, InterruptedException { + while (!eventQueue.isEmpty()) { + SSEvent event = eventQueue.take(); + asyncContext.getResponse().getWriter().write(event.toString()); + asyncContext.getResponse().getWriter().println('\n'); + + //Calling response.flushBuffer() instead of writer.flush(). + //This way an exception is thrown if the connection is already closed on the client side. + asyncContext.getResponse().flushBuffer(); + } + } + + public BlockingQueue getEventQueue() { + return eventQueue; + } + + @Override + public boolean isRepeatable() { + return httpEntity.isRepeatable(); + } + + @Override + public long getContentLength() { + return httpEntity.getContentLength(); + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + return httpEntity.getContent(); + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + httpEntity.writeTo(outStream); + } + + @Override + public boolean isStreaming() { + return httpEntity.isStreaming(); + } +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java new file mode 100644 index 0000000000..a2bf9ae1b2 --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +public class SSEException extends RuntimeException { + + public SSEException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java new file mode 100644 index 0000000000..47b7bfa16d --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.Header; +import org.apache.http.HeaderIterator; +import org.apache.http.HttpResponse; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.message.BasicHttpResponse; + +import java.util.Locale; + +public class SSEResponse extends BasicHttpResponse { + private final HttpResponse inboundResponse; + private final SSEEntity entity; + + public SSEResponse(HttpResponse inboundResponse) { + super(inboundResponse.getStatusLine()); + this.inboundResponse = inboundResponse; + this.entity = new SSEEntity(inboundResponse.getEntity()); + } + + @Override + public SSEEntity getEntity() { + return entity; + } + + @Override + public StatusLine getStatusLine() { + return inboundResponse.getStatusLine(); + } + + @Override + public void setStatusLine(StatusLine statusline) { + inboundResponse.setStatusLine(statusline); + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code) { + inboundResponse.setStatusLine(ver, code); + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code, String reason) { + inboundResponse.setStatusLine(ver, code, reason); + } + + @Override + public void setStatusCode(int code) throws IllegalStateException { + inboundResponse.setStatusCode(code); + } + + @Override + public void setReasonPhrase(String reason) throws IllegalStateException { + inboundResponse.setReasonPhrase(reason); + } + + @Override + public Locale getLocale() { + return inboundResponse.getLocale(); + } + + @Override + public void setLocale(Locale loc) { + inboundResponse.setLocale(loc); + } + + @Override + public ProtocolVersion getProtocolVersion() { + return inboundResponse.getProtocolVersion(); + } + + @Override + public boolean containsHeader(String name) { + return inboundResponse.containsHeader(name); + } + + @Override + public Header[] getHeaders(String name) { + return inboundResponse.getHeaders(name); + } + + @Override + public Header getFirstHeader(String name) { + return inboundResponse.getFirstHeader(name); + } + + @Override + public Header getLastHeader(String name) { + return inboundResponse.getLastHeader(name); + } + + @Override + public Header[] getAllHeaders() { + return inboundResponse.getAllHeaders(); + } + + @Override + public void addHeader(Header header) { + inboundResponse.addHeader(header); + } + + @Override + public void addHeader(String name, String value) { + inboundResponse.addHeader(name, value); + } + + @Override + public void setHeader(Header header) { + inboundResponse.setHeader(header); + } + + @Override + public void setHeader(String name, String value) { + inboundResponse.setHeader(name, value); + } + + @Override + public void setHeaders(Header[] headers) { + inboundResponse.setHeaders(headers); + } + + @Override + public void removeHeader(Header header) { + inboundResponse.removeHeader(header); + } + + @Override + public void removeHeaders(String name) { + inboundResponse.removeHeaders(name); + } + + @Override + public HeaderIterator headerIterator() { + return inboundResponse.headerIterator(); + } + + @Override + public HeaderIterator headerIterator(String name) { + return inboundResponse.headerIterator(name); + } +} diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java new file mode 100644 index 0000000000..dc643ed0e7 --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +public class SSEvent { + + private String data; + private String event; + private String id; + private String comment; + private Long retry; + + public SSEvent() { + } + + public SSEvent(String data, String event, String id, String comment, Long retry) { + this.data = data; + this.event = event; + this.id = id; + this.comment = comment; + this.retry = retry; + } + + public void setData(String data) { + this.data = data; + } + + public void setEvent(String event) { + this.event = event; + } + + public void setId(String id) { + this.id = id; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public void setRetry(Long retry) { + this.retry = retry; + } + + public String getData() { + return this.data; + } + + public String getEvent() { + return this.event; + } + + public String getId() { + return this.id; + } + + public String getComment() { + return this.comment; + } + + public Long getRetry() { + return this.retry; + } + + @Override + public String toString() { + StringBuilder eventString = new StringBuilder(); + + this.appendField(eventString, this.id, "id:"); + this.appendField(eventString, this.event, "event:"); + this.appendField(eventString, this.data, "data:"); + this.appendField(eventString, this.retry, "retry:"); + this.appendField(eventString, this.comment, ""); + + return eventString.toString(); + } + + private void appendField(StringBuilder eventString, Object field, String prefix) { + if (field != null) { + if (eventString.length() != 0) { + eventString.append('\n'); + } + eventString.append(prefix); + eventString.append(field); + } + } +} diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java new file mode 100644 index 0000000000..001f830c2b --- /dev/null +++ b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.knox.gateway.dispatch; + +import org.apache.http.client.config.CookieSpecs; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.GatewayServices; +import org.apache.knox.gateway.services.ServiceType; +import org.apache.knox.gateway.services.security.KeystoreService; +import org.junit.Test; + +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertNotNull; + +public class DefaultHttpAsyncClientFactoryTest { + + @Test + public void testCreateHttpAsyncClientSSLContextDefaults() throws Exception { + KeystoreService keystoreService = createMock(KeystoreService.class); + expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once(); + + GatewayConfig gatewayConfig = createMock(GatewayConfig.class); + expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once(); + expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once(); + expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once(); + expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once(); + expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes(); + + GatewayServices gatewayServices = createMock(GatewayServices.class); + expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once(); + + ServletContext servletContext = createMock(ServletContext.class); + expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce(); + expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce(); + + FilterConfig filterConfig = createMock(FilterConfig.class); + expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce(); + expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once(); + expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once(); + expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes(); + + replay(keystoreService, gatewayConfig, gatewayServices, servletContext, filterConfig); + + DefaultHttpAsyncClientFactory factory = new DefaultHttpAsyncClientFactory(); + HttpAsyncClient client = factory.createAsyncHttpClient(filterConfig); + assertNotNull(client); + + verify(keystoreService, gatewayConfig, gatewayServices, servletContext, filterConfig); + } +} diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java new file mode 100644 index 0000000000..66abcb7913 --- /dev/null +++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpStatus; +import org.apache.http.client.config.CookieSpecs; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.GatewayServices; +import org.apache.knox.gateway.services.ServiceType; +import org.apache.knox.gateway.services.security.KeystoreService; +import org.apache.knox.test.mock.MockServer; +import org.apache.knox.test.mock.MockServletContext; +import org.apache.knox.test.mock.MockServletInputStream; +import org.easymock.EasyMock; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.servlet.AsyncContext; +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class SSEDispatchTest { + + private static MockServer MOCK_SSE_SERVER; + private static URI URL; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + MOCK_SSE_SERVER = new MockServer("SSE", true); + URL = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() + "/sse"); + } + + @Test + public void testCreateAndDestroyClient() throws Exception { + SSEDispatch sseDispatch = this.createDispatch(); + assertNotNull(sseDispatch.getAsyncClient()); + + sseDispatch.destroy(); + assertFalse(((CloseableHttpAsyncClient) sseDispatch.getAsyncClient()).isRunning()); + } + + @Test + public void testGet2xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + this.expectResponseBodyAndHeader(printWriter, outboundResponse); + replay(inboundRequest, asyncContext, outboundResponse, printWriter); + + MOCK_SSE_SERVER.expect() + .method("GET") + .pathInfo("/sse") + .header("request", "header") + .header("Accept", "text/event-stream") + .respond() + .status(HttpStatus.SC_OK) + .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8) + .header("response", "header") + .contentType("text/event-stream"); + + sseDispatch.doGet(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testGet4xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_BAD_REQUEST); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("GET") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_BAD_REQUEST); + + sseDispatch.doGet(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testGet5xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("GET") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + sseDispatch.doGet(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPost2xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + this.expectResponseBodyAndHeader(printWriter, outboundResponse); + replay(inboundRequest, asyncContext, outboundResponse, printWriter); + + MOCK_SSE_SERVER.expect() + .method("POST") + .pathInfo("/sse") + .header("request", "header") + .header("Accept", "text/event-stream") + .content("{\"request\":\"body\"}", StandardCharsets.UTF_8) + .respond() + .status(HttpStatus.SC_OK) + .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8) + .header("response", "header") + .contentType("text/event-stream"); + + sseDispatch.doPost(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPost4xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("POST") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_NOT_FOUND); + + sseDispatch.doPost(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPost5xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("POST") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + sseDispatch.doPost(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPut2xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + this.expectResponseBodyAndHeader(printWriter, outboundResponse); + replay(inboundRequest, asyncContext, outboundResponse, printWriter); + + MOCK_SSE_SERVER.expect() + .method("PUT") + .pathInfo("/sse") + .header("request", "header") + .header("Accept", "text/event-stream") + .content("{\"request\":\"body\"}", StandardCharsets.UTF_8) + .respond() + .status(HttpStatus.SC_OK) + .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8) + .header("response", "header") + .contentType("text/event-stream"); + + sseDispatch.doPut(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPut4xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("PUT") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_NOT_FOUND); + + sseDispatch.doPut(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPut5xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("PUT") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + sseDispatch.doPut(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPatch2xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + this.expectResponseBodyAndHeader(printWriter, outboundResponse); + replay(inboundRequest, asyncContext, outboundResponse, printWriter); + + MOCK_SSE_SERVER.expect() + .method("PATCH") + .pathInfo("/sse") + .header("request", "header") + .header("Accept", "text/event-stream") + .content("{\"request\":\"body\"}", StandardCharsets.UTF_8) + .respond() + .status(HttpStatus.SC_OK) + .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8) + .header("response", "header") + .contentType("text/event-stream"); + + sseDispatch.doPatch(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPatch4xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("PATCH") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_NOT_FOUND); + + sseDispatch.doPatch(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testPatch5xx() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + MOCK_SSE_SERVER.expect() + .method("PATCH") + .pathInfo("/sse") + .respond() + .status(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + sseDispatch.doPatch(URL, inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + assertTrue(MOCK_SSE_SERVER.isEmpty()); + } + + @Test + public void testServerNotAvailable() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SSEDispatch sseDispatch = this.createDispatch(); + HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class); + AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse); + HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext); + + replay(inboundRequest, asyncContext, outboundResponse); + + sseDispatch.doGet(new URI("http://localhost:11223/sse"), inboundRequest, outboundResponse); + + latch.await(1L, TimeUnit.SECONDS); + EasyMock.verify(asyncContext, outboundResponse, inboundRequest); + } + + private HttpServletRequest getHttpServletRequest(AsyncContext asyncContext) throws Exception { + Map headers = new HashMap<>(); + headers.put("request", "header"); + InputStream stream = new ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8)); + MockServletInputStream mockServletInputStream = new MockServletInputStream(stream); + HttpServletRequest inboundRequest = EasyMock.createNiceMock(HttpServletRequest.class); + + EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes(); + EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).once(); + EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once(); + EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes(); + EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes(); + EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes(); + EasyMock.expect(inboundRequest.getServletContext()).andReturn(new MockServletContext()).anyTimes(); + + return inboundRequest; + } + + private AsyncContext getAsyncContext(CountDownLatch latch, HttpServletResponse outboundResponse) { + AsyncContext asyncContext = EasyMock.createNiceMock(AsyncContext.class); + + EasyMock.expect(asyncContext.getResponse()).andReturn(outboundResponse).anyTimes(); + asyncContext.complete(); + EasyMock.expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + return asyncContext; + } + + private HttpServletResponse getServletResponse(int statusCode) { + HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class); + + outboundResponse.setStatus(statusCode); + EasyMock.expectLastCall(); + + return outboundResponse; + } + + private void expectResponseBodyAndHeader(PrintWriter printWriter, HttpServletResponse outboundResponse) throws Exception { + outboundResponse.addHeader("response", "header"); + EasyMock.expectLastCall(); + EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes(); + printWriter.write("id:1\nevent:event1\ndata:data1"); + EasyMock.expectLastCall(); + printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing"); + EasyMock.expectLastCall(); + printWriter.println('\n'); + EasyMock.expectLastCall().times(2); + } + + private SSEDispatch createDispatch() throws Exception { + KeystoreService keystoreService = createMock(KeystoreService.class); + expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once(); + + GatewayConfig gatewayConfig = createMock(GatewayConfig.class); + expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once(); + expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once(); + expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once(); + expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once(); + expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes(); + + GatewayServices gatewayServices = createMock(GatewayServices.class); + expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once(); + + ServletContext servletContext = createMock(ServletContext.class); + expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce(); + expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce(); + + FilterConfig filterConfig = createMock(FilterConfig.class); + expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce(); + expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once(); + expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once(); + expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once(); + expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes(); + + replay(keystoreService, gatewayConfig, gatewayServices, servletContext, filterConfig); + + return new SSEDispatch(filterConfig); + } +} \ No newline at end of file diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java new file mode 100644 index 0000000000..1c0375d9ab --- /dev/null +++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpEntity; +import org.easymock.EasyMock; +import org.junit.Test; + +import java.nio.CharBuffer; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + + +public class SSEEntityTest { + + private final HttpEntity entityMock = EasyMock.createNiceMock(HttpEntity.class); + + @Test + public void testParseSingleEvent() { + SSEEntity sseEntity = new SSEEntity(entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvent = "id: 1\nevent: event\ndata: data\nretry:1\n:testing\n\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvent); + + sseEntity.readCharBuffer(cb); + + assertFalse(eventQueue.isEmpty()); + + SSEvent actualSSEvent = eventQueue.peek(); + assertEquals("1", actualSSEvent.getId()); + assertEquals("event", actualSSEvent.getEvent()); + assertEquals("data", actualSSEvent.getData()); + assertEquals(1L, actualSSEvent.getRetry().longValue()); + assertEquals(":testing", actualSSEvent.getComment()); + } + + @Test + public void testParseMultipleEvents() { + SSEEntity sseEntity = new SSEEntity(this.entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvents = "id: 1\nevent: event\ndata: data\n\nid: 2\nevent: event2\ndata: data2\n\nid: 3\nevent: event3\ndata: data3\nretry:1045\n:TEST\n\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvents); + + sseEntity.readCharBuffer(cb); + + assertEquals(3, eventQueue.size()); + + SSEvent actualSSEvent = eventQueue.poll(); + assertEquals("1", actualSSEvent.getId()); + assertEquals("event", actualSSEvent.getEvent()); + assertEquals("data", actualSSEvent.getData()); + assertNull(actualSSEvent.getRetry()); + assertNull(actualSSEvent.getComment()); + + actualSSEvent = eventQueue.poll(); + assertEquals("2", actualSSEvent.getId()); + assertEquals("event2", actualSSEvent.getEvent()); + assertEquals("data2", actualSSEvent.getData()); + assertNull(actualSSEvent.getRetry()); + assertNull(actualSSEvent.getComment()); + + actualSSEvent = eventQueue.poll(); + assertEquals("3", actualSSEvent.getId()); + assertEquals("event3", actualSSEvent.getEvent()); + assertEquals("data3", actualSSEvent.getData()); + assertEquals(1045L, actualSSEvent.getRetry().longValue()); + assertEquals(":TEST", actualSSEvent.getComment()); + } + + @Test + public void testMissingNewLine() { + SSEEntity sseEntity = new SSEEntity(entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvent = "id: 1\nevent: event\ndata: data\nretry:1045\n:TEST\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvent); + + sseEntity.readCharBuffer(cb); + + assertTrue(eventQueue.isEmpty()); + } + + @Test + public void testParseEventWithSpecialChars() { + SSEEntity sseEntity = new SSEEntity(entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvent = "id: 75a0a510-0065-498f:be39-c6f42a3fe4af\ndata: data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}\nretry:33\n::::test\n\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvent); + + sseEntity.readCharBuffer(cb); + + assertFalse(eventQueue.isEmpty()); + + SSEvent actualSSEvent = eventQueue.peek(); + assertEquals("75a0a510-0065-498f:be39-c6f42a3fe4af", actualSSEvent.getId()); + assertNull(actualSSEvent.getEvent()); + assertEquals("data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}", actualSSEvent.getData()); + assertEquals(33L, actualSSEvent.getRetry().longValue()); + assertEquals("::::test", actualSSEvent.getComment()); + } + + @Test + public void testInvalidFormat() { + SSEEntity sseEntity = new SSEEntity(entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvent = "id: 1\nevent: event\ndata: data\nretry:1045\n:TEST\nid: 1\nevent: event\ndata: data\nretry:1045\n:TEST\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvent); + + sseEntity.readCharBuffer(cb); + + assertTrue(eventQueue.isEmpty()); + } + + @Test + public void testParseEventsWithDifferentNewLineChars() { + SSEEntity sseEntity = new SSEEntity(entityMock); + BlockingQueue eventQueue = sseEntity.getEventQueue(); + String unprocessedEvents = "id: 1\nevent: event\ndata: data\r\nid: 2\nevent: event2\ndata: data2\u2028\nid: 3\nevent: event3\ndata: data3\u2029\nid: 4\nevent: event4\ndata: data4\u0085\n"; + CharBuffer cb = CharBuffer.wrap(unprocessedEvents); + + sseEntity.readCharBuffer(cb); + + assertEquals(4, eventQueue.size()); + + SSEvent actualSSEvent = eventQueue.poll(); + assertEquals("1", actualSSEvent.getId()); + assertEquals("event", actualSSEvent.getEvent()); + assertEquals("data", actualSSEvent.getData()); + + actualSSEvent = eventQueue.poll(); + assertEquals("2", actualSSEvent.getId()); + assertEquals("event2", actualSSEvent.getEvent()); + assertEquals("data2", actualSSEvent.getData()); + + actualSSEvent = eventQueue.poll(); + assertEquals("3", actualSSEvent.getId()); + assertEquals("event3", actualSSEvent.getEvent()); + assertEquals("data3", actualSSEvent.getData()); + + actualSSEvent = eventQueue.poll(); + assertEquals("4", actualSSEvent.getId()); + assertEquals("event4", actualSSEvent.getEvent()); + assertEquals("data4", actualSSEvent.getData()); + } +} diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java new file mode 100644 index 0000000000..378726ab33 --- /dev/null +++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SSEventTest { + + + @Test + public void testToStringWithAll() { + SSEvent ssEvent = new SSEvent("data", "event", "id", ":comment", 5L); + String expected = "id:id\nevent:event\ndata:data\nretry:5\n:comment"; + + assertEquals(expected, ssEvent.toString()); + } + + @Test + public void testToStringNoId() { + SSEvent ssEventNull = new SSEvent("data", "event", null, ":test", 2L); + SSEvent ssEventEmpty = new SSEvent("data", "event", "", ":new comment", 1L); + String expectedForNull = "event:event\ndata:data\nretry:2\n:test"; + String expectedForEmpty = "id:\nevent:event\ndata:data\nretry:1\n:new comment"; + + assertEquals(expectedForNull, ssEventNull.toString()); + assertEquals(expectedForEmpty, ssEventEmpty.toString()); + } + + @Test + public void testToStringNoEvent() { + SSEvent ssEventNull = new SSEvent("data", null, "id", ":comment", 11L); + SSEvent ssEventEmpty = new SSEvent("data", "", "id", ":test comment", 30L); + String expectedForNull = "id:id\ndata:data\nretry:11\n:comment"; + String expectedForEmpty = "id:id\nevent:\ndata:data\nretry:30\n:test comment"; + + assertEquals(expectedForNull, ssEventNull.toString()); + assertEquals(expectedForEmpty, ssEventEmpty.toString()); + } + + @Test + public void testToStringNoData() { + SSEvent ssEventNull = new SSEvent(null, "event", "id", ":comment", 2L); + SSEvent ssEventEmpty = new SSEvent("", "event", "id", ":testing", 1L); + String expectedForNull = "id:id\nevent:event\nretry:2\n:comment"; + String expectedForEmpty = "id:id\nevent:event\ndata:\nretry:1\n:testing"; + + assertEquals(expectedForNull, ssEventNull.toString()); + assertEquals(expectedForEmpty, ssEventEmpty.toString()); + } + + @Test + public void testToStringNoComment() { + SSEvent ssEventNull = new SSEvent("data", "event", "id", null, 3L); + String expected = "id:id\nevent:event\ndata:data\nretry:3"; + + assertEquals(expected, ssEventNull.toString()); + } + + @Test + public void testToStringNoRetry() { + SSEvent ssEventNull = new SSEvent("data", "event", "id", ":TEST", null); + String expected = "id:id\nevent:event\ndata:data\n:TEST"; + + assertEquals(expected, ssEventNull.toString()); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9bd91b46cf..b255e0520e 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ 1.8.1 9.0 1.9.6 + 4.1.5 1.67 2.8.8 1.4 @@ -1549,6 +1550,16 @@ httpcore ${httpcore.version} + + org.apache.httpcomponents + httpasyncclient + ${asynchttpclient.version} + + + org.apache.httpcomponents + httpcore-nio + ${httpcore.version} + joda-time