+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.metrics.MetricsService;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.FilterConfig;
+
+public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory implements HttpAsyncClientFactory {
+
+ @Override
+ public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) {
+ final String serviceRole = filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE);
+ HttpAsyncClientBuilder builder;
+ GatewayConfig gatewayConfig = (GatewayConfig) filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+ GatewayServices services = (GatewayServices) filterConfig.getServletContext()
+ .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
+ if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) {
+ MetricsService metricsService = services.getService(ServiceType.METRICS_SERVICE);
+ builder = metricsService.getInstrumented(HttpAsyncClientBuilder.class);
+ } else {
+ builder = HttpAsyncClients.custom();
+ }
+
+ // Conditionally set a custom SSLContext
+ SSLContext sslContext = createSSLContext(services, filterConfig, serviceRole);
+ if (sslContext != null) {
+ builder.setSSLContext(sslContext);
+ }
+
+ if (Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new DefaultHttpAsyncClientFactory.UseJaasCredentials());
+
+ Registry
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+ private final HttpAsyncClient asyncClient;
+ private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+ public SSEDispatch(FilterConfig filterConfig) {
+ HttpAsyncClientFactory asyncClientFactory = new DefaultHttpAsyncClientFactory();
+ this.asyncClient = asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpGet httpGetRequest = new HttpGet(url);
+ this.doHttpMethod(httpGetRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse)
+ throws IOException, URISyntaxException {
+ final HttpPost httpPostRequest = new HttpPost(url);
+ httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPostRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPut httpPutRequest = new HttpPut(url);
+ httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPutRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPatch(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPatch httpPatchRequest = new HttpPatch(url);
+ httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPatchRequest, inboundRequest, outboundResponse);
+ }
+
+ private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
+ this.addAcceptHeader(httpMethod);
+ this.copyRequestHeaderFields(httpMethod, inboundRequest);
+ this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+ }
+
+ private void executeRequest(HttpUriRequest outboundRequest, HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+ AsyncContext asyncContext = inboundRequest.startAsync();
+ //No timeout
+ asyncContext.setTimeout(0L);
+
+ HttpAsyncRequestProducer producer = HttpAsyncMethods.create(outboundRequest);
+ AsyncCharConsumer
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+ private static final String SSE_DELIMITER = ":";
+
+ private final BlockingQueue
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEException extends RuntimeException {
+
+ public SSEException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
new file mode 100644
index 0000000000..47b7bfa16d
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.Header;
+import org.apache.http.HeaderIterator;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.message.BasicHttpResponse;
+
+import java.util.Locale;
+
+public class SSEResponse extends BasicHttpResponse {
+ private final HttpResponse inboundResponse;
+ private final SSEEntity entity;
+
+ public SSEResponse(HttpResponse inboundResponse) {
+ super(inboundResponse.getStatusLine());
+ this.inboundResponse = inboundResponse;
+ this.entity = new SSEEntity(inboundResponse.getEntity());
+ }
+
+ @Override
+ public SSEEntity getEntity() {
+ return entity;
+ }
+
+ @Override
+ public StatusLine getStatusLine() {
+ return inboundResponse.getStatusLine();
+ }
+
+ @Override
+ public void setStatusLine(StatusLine statusline) {
+ inboundResponse.setStatusLine(statusline);
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion ver, int code) {
+ inboundResponse.setStatusLine(ver, code);
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion ver, int code, String reason) {
+ inboundResponse.setStatusLine(ver, code, reason);
+ }
+
+ @Override
+ public void setStatusCode(int code) throws IllegalStateException {
+ inboundResponse.setStatusCode(code);
+ }
+
+ @Override
+ public void setReasonPhrase(String reason) throws IllegalStateException {
+ inboundResponse.setReasonPhrase(reason);
+ }
+
+ @Override
+ public Locale getLocale() {
+ return inboundResponse.getLocale();
+ }
+
+ @Override
+ public void setLocale(Locale loc) {
+ inboundResponse.setLocale(loc);
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return inboundResponse.getProtocolVersion();
+ }
+
+ @Override
+ public boolean containsHeader(String name) {
+ return inboundResponse.containsHeader(name);
+ }
+
+ @Override
+ public Header[] getHeaders(String name) {
+ return inboundResponse.getHeaders(name);
+ }
+
+ @Override
+ public Header getFirstHeader(String name) {
+ return inboundResponse.getFirstHeader(name);
+ }
+
+ @Override
+ public Header getLastHeader(String name) {
+ return inboundResponse.getLastHeader(name);
+ }
+
+ @Override
+ public Header[] getAllHeaders() {
+ return inboundResponse.getAllHeaders();
+ }
+
+ @Override
+ public void addHeader(Header header) {
+ inboundResponse.addHeader(header);
+ }
+
+ @Override
+ public void addHeader(String name, String value) {
+ inboundResponse.addHeader(name, value);
+ }
+
+ @Override
+ public void setHeader(Header header) {
+ inboundResponse.setHeader(header);
+ }
+
+ @Override
+ public void setHeader(String name, String value) {
+ inboundResponse.setHeader(name, value);
+ }
+
+ @Override
+ public void setHeaders(Header[] headers) {
+ inboundResponse.setHeaders(headers);
+ }
+
+ @Override
+ public void removeHeader(Header header) {
+ inboundResponse.removeHeader(header);
+ }
+
+ @Override
+ public void removeHeaders(String name) {
+ inboundResponse.removeHeaders(name);
+ }
+
+ @Override
+ public HeaderIterator headerIterator() {
+ return inboundResponse.headerIterator();
+ }
+
+ @Override
+ public HeaderIterator headerIterator(String name) {
+ return inboundResponse.headerIterator(name);
+ }
+}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
new file mode 100644
index 0000000000..dc643ed0e7
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEvent {
+
+ private String data;
+ private String event;
+ private String id;
+ private String comment;
+ private Long retry;
+
+ public SSEvent() {
+ }
+
+ public SSEvent(String data, String event, String id, String comment, Long retry) {
+ this.data = data;
+ this.event = event;
+ this.id = id;
+ this.comment = comment;
+ this.retry = retry;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public void setEvent(String event) {
+ this.event = event;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+
+ public void setRetry(Long retry) {
+ this.retry = retry;
+ }
+
+ public String getData() {
+ return this.data;
+ }
+
+ public String getEvent() {
+ return this.event;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public String getComment() {
+ return this.comment;
+ }
+
+ public Long getRetry() {
+ return this.retry;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder eventString = new StringBuilder();
+
+ this.appendField(eventString, this.id, "id:");
+ this.appendField(eventString, this.event, "event:");
+ this.appendField(eventString, this.data, "data:");
+ this.appendField(eventString, this.retry, "retry:");
+ this.appendField(eventString, this.comment, "");
+
+ return eventString.toString();
+ }
+
+ private void appendField(StringBuilder eventString, Object field, String prefix) {
+ if (field != null) {
+ if (eventString.length() != 0) {
+ eventString.append('\n');
+ }
+ eventString.append(prefix);
+ eventString.append(field);
+ }
+ }
+}
diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
new file mode 100644
index 0000000000..001f830c2b
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.junit.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultHttpAsyncClientFactoryTest {
+
+ @Test
+ public void testCreateHttpAsyncClientSSLContextDefaults() throws Exception {
+ KeystoreService keystoreService = createMock(KeystoreService.class);
+ expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+ GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+ expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+ expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+ expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+ expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+ expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+ GatewayServices gatewayServices = createMock(GatewayServices.class);
+ expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+ ServletContext servletContext = createMock(ServletContext.class);
+ expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+ expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+ FilterConfig filterConfig = createMock(FilterConfig.class);
+ expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+ expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+ expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+ expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+ expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+ expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+ expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+ replay(keystoreService, gatewayConfig, gatewayServices, servletContext, filterConfig);
+
+ DefaultHttpAsyncClientFactory factory = new DefaultHttpAsyncClientFactory();
+ HttpAsyncClient client = factory.createAsyncHttpClient(filterConfig);
+ assertNotNull(client);
+
+ verify(keystoreService, gatewayConfig, gatewayServices, servletContext, filterConfig);
+ }
+}
diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
new file mode 100644
index 0000000000..66abcb7913
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.test.mock.MockServer;
+import org.apache.knox.test.mock.MockServletContext;
+import org.apache.knox.test.mock.MockServletInputStream;
+import org.easymock.EasyMock;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SSEDispatchTest {
+
+ private static MockServer MOCK_SSE_SERVER;
+ private static URI URL;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ MOCK_SSE_SERVER = new MockServer("SSE", true);
+ URL = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() + "/sse");
+ }
+
+ @Test
+ public void testCreateAndDestroyClient() throws Exception {
+ SSEDispatch sseDispatch = this.createDispatch();
+ assertNotNull(sseDispatch.getAsyncClient());
+
+ sseDispatch.destroy();
+ assertFalse(((CloseableHttpAsyncClient) sseDispatch.getAsyncClient()).isRunning());
+ }
+
+ @Test
+ public void testGet2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+ .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testGet4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_BAD_REQUEST);
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testGet5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+ .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+ .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+ .content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n", StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest, printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testServerNotAvailable() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class);
+ AsyncContext asyncContext = this.getAsyncContext(latch, outboundResponse);
+ HttpServletRequest inboundRequest = this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ sseDispatch.doGet(new URI("http://localhost:11223/sse"), inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ }
+
+ private HttpServletRequest getHttpServletRequest(AsyncContext asyncContext) throws Exception {
+ Map