diff --git a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala index 11d0f1f7..358d4112 100644 --- a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala +++ b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala @@ -20,6 +20,7 @@ import java.util.concurrent._ import com.comcast.money.core.internal.{ MDCSupport, SpanLocal } import com.comcast.money.core.logging.TraceLogging +import com.comcast.money.core.state.State import org.slf4j.MDC object TraceFriendlyThreadPoolExecutor { @@ -64,28 +65,19 @@ class TraceFriendlyThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, k setRejectedExecutionHandler(rejectedExecutionHandler) } - override def execute(command: Runnable) = { - val inherited = SpanLocal.current - val submittingThreadsContext = MDC.getCopyOfContextMap + override def execute(command: Runnable) { + val state = State.capture() - super.execute( - new Runnable { - override def run = { - mdcSupport.propogateMDC(Option(submittingThreadsContext)) - SpanLocal.clear() - inherited.foreach(SpanLocal.push) - try { - command.run() - } catch { - case t: Throwable => - logException(t) - throw t - } finally { - SpanLocal.clear() - MDC.clear() - } + super.execute(new Runnable { + override def run(): Unit = state.restore { + try { + command.run() + } catch { + case t: Throwable => + logException(t) + throw t } } - ) + }) } } diff --git a/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala new file mode 100644 index 00000000..0b0c42f8 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.state + +/** Represents the restored tracing state. */ +trait RestoredState extends AutoCloseable { + /** Reverts the restored tracing state on the current thread */ + override def close(): Unit +} diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala new file mode 100644 index 00000000..9a1b76b4 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.state + +import com.comcast.money.core.internal.{ MDCSupport, SpanLocal } +import org.slf4j.MDC + +object State { + private lazy val mdcSupport = new MDCSupport() + + /** + * Captures the state of the current tracing span so that it can be restored onto + * a separate worker thread. + * + * {{{ + * import com.comcast.money.core.state + * + * def doSomethingAsynchronous(executor: ExecutorService) { + * val capturedState = State.capture() + * + * executor.submit(new Runnable { + * override def run(): Unit = state.restore { + * // resumes on captured + * } + * }) + * } + * }}} + * @return the captured tracing state + */ + def capture(): State = { + val span = SpanLocal.current + val mdc = MDC.getCopyOfContextMap + + new State { + override def restore(): RestoredState = { + mdcSupport.propogateMDC(Option(mdc)) + SpanLocal.clear() + span.foreach(SpanLocal.push) + new RestoredState { + override def close(): Unit = { + MDC.clear() + SpanLocal.clear() + } + } + } + } + } +} + +/** Represents the tracing state that has been captured from another thread */ +trait State { + /** + * Restores the tracing state on the current thread returning a [[RestoredState]] + * that can be used with Java 7+ "try-with-resources" syntax to revert the + * restored state. + * + * {{{ + * State state = State.capture(); + * // later + * try (RestoredState restored = state.restore()) { + * // do something meaningful here + * } + * }}} + * @return the restored state + */ + def restore(): RestoredState + + /** + * Restores the tracing state on the current thread for the duration of the + * function. + * + * {{{ + * val state = State.capture + * // later + * state.restore { + * // do something meaningful here + * } + * }}} + * @param f the function to invoke + * @tparam T the return value of the function + * @return the value returned from the function + */ + def restore[T](f: => T): T = { + val restoredState = restore() + try { + f + } finally { + if (restoredState != null) restoredState.close() + } + } +} diff --git a/money-http-async-client/src/main/resources/reference.conf b/money-http-async-client/src/main/resources/reference.conf new file mode 100644 index 00000000..2622253c --- /dev/null +++ b/money-http-async-client/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +# Copyright 2012-2015 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +money { + http-client { + metric-names { + http-call-duration = "http-call-duration" + http-call-with-body-duration = "http-call-with-body-duration" + http-process-response-duration = "http-process-response-duration" + http-response-code = "http-response-code" + } + } +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala new file mode 100644 index 00000000..3db8f7ed --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.http.client + +object HttpAsyncTraceConfig { + lazy val HttpResponseTimeTraceKey: String = "http-call-duration" + lazy val HttpFullResponseTimeTraceKey: String = "http-call-with-body-duration" + lazy val ProcessResponseTimeTraceKey: String = "http-process-response-duration" + lazy val HttpResponseCodeTraceKey: String = "http-response-code" +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala new file mode 100644 index 00000000..df3ce462 --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -0,0 +1,262 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.http.client + +import java.io.Closeable +import java.util.concurrent.{ CancellationException, Future } + +import com.comcast.money.api.Span +import com.comcast.money.core.{ Formatters, Money, Tracer } +import org.apache.http.protocol.HttpContext +import org.apache.http.{ HttpHost, HttpRequest, HttpResponse } +import com.comcast.money.core.internal.SpanLocal +import com.comcast.money.core.state.State +import org.apache.http.client.methods.HttpUriRequest +import org.apache.http.concurrent.FutureCallback +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.{ ContentDecoder, ContentEncoder, IOControl } +import org.apache.http.nio.client.HttpAsyncClient +import org.apache.http.nio.protocol.{ HttpAsyncRequestProducer, HttpAsyncResponseConsumer } + +import scala.util.{ Failure, Try } + +object TraceFriendlyHttpAsyncSupport { + def wrapExecute( + httpRequest: HttpRequest, + callback: FutureCallback[HttpResponse], + tracer: Tracer + )(f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { + // capture the current tracing state... + val state = State.capture() + + // Put the X-MoneyTrace header in the request... + TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), SpanLocal.current) + + // Start timing the execution of the request... + tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Wrap the callback to intercept the response... + val tracingCallback = new TracingFutureCallback[HttpResponse](Option(callback), state, response => { + // Stop timing the execution of the request... + tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Get the response code, will be 0 if response is null + val responseCode = getResponseCode(response) + tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) + }) + + // Continue with the execution of the request... + f(tracingCallback) + } + + def wrapExecute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T], + tracer: Tracer + )(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { + // capture the current tracing state... + val state = State.capture() + val span = SpanLocal.current + + // Wrap the producer interface to intercept the request... + val tracingRequestProducer = new TracingHttpAsyncRequestProducer(requestProducer, state, (httpRequest: HttpRequest) => { + // Put the X-MoneyTrace header in the request... + TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), span) + + // Start timing the execution of the request... + tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + httpRequest + }) + + // Wrap the consumer interface to intercept the response... + val tracingResponseConsumer = new TracingHttpAsyncResponseConsumer[T](responseConsumer, state, (httpResponse: Try[HttpResponse]) => { + // Stop timing the execution of the request... + tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Get the response code, will be 0 if response is null + val responseCode = getResponseCode(httpResponse) + tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) + httpResponse + }) + + // Wrap the callback interface to restore the tracing state on the callback thread... + val tracingCallback = new TracingFutureCallback[T](Option(callback), state, _ => {}) + + // Continue with the execution of the request... + f(tracingRequestProducer, tracingResponseConsumer, tracingCallback) + } + + def addTraceHeader(httpRequest: Option[HttpRequest], currentSpan: Option[Span]) { + (httpRequest, currentSpan) match { + case (Some(request), Some(span)) => request.setHeader("X-MoneyTrace", Formatters.toHttpHeader(span.info.id)) + } + } + + def getResponseCode(response: Try[HttpResponse]): Int = + response map (_.getStatusLine) map (_.getStatusCode) getOrElse 0 +} + +/** + * Provides a thin wrapper around [[HttpAsyncClient]] to support automatically tracing + * requests and restoring the tracing state on the callback interfaces. + */ +class TraceFriendlyHttpAsyncClient(wrappee: HttpAsyncClient) extends HttpAsyncClient + with java.io.Closeable { + + import com.comcast.money.http.client.TraceFriendlyHttpAsyncSupport._ + + val tracer = Money.Environment.tracer + + override def execute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + context: HttpContext, + callback: FutureCallback[T] + ): Future[T] = + wrapExecute(requestProducer, responseConsumer, callback, tracer) { + (tracingRequestProducer, tracingResponseConsumer, tracingCallback) => wrappee.execute(tracingRequestProducer, tracingResponseConsumer, context, tracingCallback) + } + + override def execute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T] + ): Future[T] = + wrapExecute(requestProducer, responseConsumer, callback, tracer) { + (tracingRequestProducer, tracingResponseConsumer, tracingCallback) => wrappee.execute(tracingRequestProducer, tracingResponseConsumer, tracingCallback) + } + + override def execute( + target: HttpHost, + request: HttpRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + tracingCallback => wrappee.execute(target, request, context, tracingCallback) + } + + override def execute( + target: HttpHost, + request: HttpRequest, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + tracingCallback => wrappee.execute(target, request, tracingCallback) + } + + override def execute( + request: HttpUriRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + tracingCallback => wrappee.execute(request, context, tracingCallback) + } + + override def execute( + request: HttpUriRequest, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + tracingCallback => wrappee.execute(request, tracingCallback) + } + + override def close(): Unit = { + wrappee match { + case closeable: CloseableHttpAsyncClient => + closeable.close() + case closeable: Closeable => + closeable.close() + case closeable: AutoCloseable => + closeable.close() + case _ => + } + } +} + +/** + * Wraps the [[FutureCallback]] callback interface and restores the captured tracing + * state before invoking any of the wrapped methods. + */ +class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State, f: Try[T] => Unit) extends FutureCallback[T] { + override def failed(ex: Exception): Unit = state.restore { + f(Failure(ex)) + wrappee.foreach(_.failed(ex)) + } + + override def completed(result: T): Unit = state.restore { + f(Try(result)) + wrappee.foreach(_.completed(result)) + } + + override def cancelled(): Unit = state.restore { + f(Failure(new CancellationException())) + wrappee.foreach(_.cancelled()) + } +} + +/** + * Wraps the [[HttpAsyncRequestProducer]] interface to intercept the [[HttpRequest]]. + */ +class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, state: State, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { + override def generateRequest(): HttpRequest = state.restore { + f(wrappee.generateRequest()) + } + + override def requestCompleted(context: HttpContext): Unit = state.restore { + wrappee.requestCompleted(context) + } + + override def failed(ex: Exception): Unit = state.restore { + wrappee.failed(ex) + } + + override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) + override def isRepeatable: Boolean = wrappee.isRepeatable + override def getTarget: HttpHost = wrappee.getTarget + override def resetRequest(): Unit = wrappee.resetRequest() + override def close(): Unit = wrappee.close() +} + +/** + * Wraps the [[HttpAsyncResponseConsumer]] interface to intercept the completion of the + * HTTP request, handle the [[HttpResponse]] and to restore the captured tracing state. + */ +class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], state: State, f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { + override def responseReceived(response: HttpResponse): Unit = state.restore { + f(Try(response)) + wrappee.responseReceived(response) + } + + override def responseCompleted(context: HttpContext): Unit = state.restore { + wrappee.responseCompleted(context) + } + + override def failed(ex: Exception): Unit = state.restore { + f(Failure(ex)) + wrappee.failed(ex) + } + + override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) + override def isDone: Boolean = wrappee.isDone + override def getResult: T = wrappee.getResult + override def getException: Exception = wrappee.getException + override def cancel(): Boolean = wrappee.cancel() + override def close(): Unit = wrappee.close() +} \ No newline at end of file diff --git a/money-http-async-client/src/test/resources/application.conf b/money-http-async-client/src/test/resources/application.conf new file mode 100644 index 00000000..d2f1780b --- /dev/null +++ b/money-http-async-client/src/test/resources/application.conf @@ -0,0 +1,47 @@ +# Copyright 2012-2015 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +money { + enabled = true + mdc.enabled = true + application-name = "unknown" + log-exceptions = false + + handling = { + async = true + handlers = [ + { + class = "com.comcast.money.core.LogRecorderSpanHandler" + log-level = "INFO" + formatting { + span-start = "Span: " + null-value = "NULL" + log-template = "[ %s=%s ]" + span-duration-ms-enabled = "false" + keys { + span-id = "span-id" + trace-id = "trace-id" + parent-id = "parent-id" + span-name = "span-name" + app-name = "app-name" + start-time = "start-time" + span-duration = "span-duration" + span-duration-ms = "span-duration-ms" + span-success = "span-success" + } + } + } + ] + } +} diff --git a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala new file mode 100644 index 00000000..6a6a4ccc --- /dev/null +++ b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala @@ -0,0 +1,302 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.http.client + +import java.io.Closeable +import java.util.concurrent.{ CompletableFuture, Executors, Future, TimeUnit } + +import com.comcast.money.api.SpanId +import com.comcast.money.core.{ SpecHelpers, Tracer, Formatters => CoreSpanId } +import com.comcast.money.core.internal.SpanLocal +import org.apache.http.client.methods.{ CloseableHttpResponse, HttpUriRequest } +import org.apache.http.concurrent.FutureCallback +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.{ ContentDecoder, ContentEncoder, IOControl } +import org.apache.http.nio.client.HttpAsyncClient +import org.apache.http.nio.protocol.{ HttpAsyncRequestProducer, HttpAsyncResponseConsumer } +import org.apache.http.protocol.HttpContext +import org.apache.http.{ HttpHost, HttpResponse, StatusLine } +import org.mockito.ArgumentCaptor +import org.mockito.Mockito._ +import org.mockito.Matchers.{ eq => argEq } +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +class TraceFriendlyHttpAsyncClientSpec extends WordSpec with SpecHelpers + with Matchers with MockitoSugar with OneInstancePerTest with BeforeAndAfterEach { + + val httpClient = mock[CloseableHttpAsyncClient] + val httpUriRequest = mock[HttpUriRequest] + val httpResponse = mock[CloseableHttpResponse] + val statusLine = mock[StatusLine] + val httpHost = new HttpHost("localhost") + val httpContext = mock[HttpContext] + val callback = mock[FutureCallback[HttpResponse]] + val requestProducer = mock[HttpAsyncRequestProducer] + val responseConsumer = mock[HttpAsyncResponseConsumer[HttpResponse]] + + val spanId = new SpanId() + val future = new CompletableFuture[HttpResponse]() + + val callbackCaptor = ArgumentCaptor.forClass(classOf[FutureCallback[HttpResponse]]) + val producerCaptor = ArgumentCaptor.forClass(classOf[HttpAsyncRequestProducer]) + val consumerCaptor = ArgumentCaptor.forClass(classOf[HttpAsyncResponseConsumer[HttpResponse]]) + + val executor = Executors.newScheduledThreadPool(1) + + var callbackSpanId: Option[SpanId] = None + + when(httpResponse.getStatusLine).thenReturn(statusLine) + when(statusLine.getStatusCode).thenReturn(200) + + // extend what we are testing so we can use a mock tracer + val underTest = new TraceFriendlyHttpAsyncClient(httpClient) { + override val tracer = mock[Tracer] + } + + override def beforeEach(): Unit = { + SpanLocal.push(testSpan(spanId)) + doReturn(httpUriRequest).when(requestProducer).generateRequest() + doReturn(httpResponse).when(responseConsumer).getResult + + doAnswer(new Answer[Void] { + override def answer(invocation: InvocationOnMock): Void = { + // copy span ID of current thread to field to match in test + callbackSpanId = SpanLocal.current.map(_.info.id) + null + } + }).when(callback).completed(httpResponse) + } + + // if you don't reset, then the verifies are going to be off + override def afterEach() = { + reset(underTest.tracer, httpUriRequest, httpClient) + SpanLocal.clear() + } + + def verifyTracing() = { + verify(underTest.tracer).startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + verify(underTest.tracer).stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + verify(underTest.tracer).record("http-response-code", 200L) + verify(httpUriRequest).setHeader("X-MoneyTrace", CoreSpanId.toHttpHeader(spanId)) + assert(callbackSpanId.contains(spanId)) + } + + def completeCallback(): Unit = { + val callback = callbackCaptor.getValue + val runnable = new Runnable { + override def run(): Unit = { + callback.completed(httpResponse) + future.complete(httpResponse) + } + } + executor.schedule(runnable, 50, TimeUnit.MILLISECONDS) + future.get() + } + + "TraceFriendlyHttpAsyncClient" should { + "record the status code and call duration when execute(HttpUriRequest, FutureCallback)" in { + underTest.execute(httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpUriRequest), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpUriRequest, HttpContext, FutureCallback)" in { + underTest.execute(httpUriRequest, httpContext, callback) + + verify(httpClient).execute(argEq(httpUriRequest), argEq(httpContext), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpHost, HttpRequest, FutureCallback)" in { + underTest.execute(httpHost, httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpHost), argEq(httpUriRequest), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpHost, HttpRequest, HttpContext, FutureCallback)" in { + underTest.execute(httpHost, httpUriRequest, httpContext, callback) + + verify(httpClient).execute(argEq(httpHost), argEq(httpUriRequest), argEq(httpContext), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, FutureCallback)" in { + underTest.execute(requestProducer, responseConsumer, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), callbackCaptor.capture()) + producerCaptor.getValue.generateRequest() + val captured = consumerCaptor.getValue + captured.responseReceived(httpResponse) + captured.responseCompleted(httpContext) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, HttpContext, FutureCallback)" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + producerCaptor.getValue.generateRequest() + val capturedConsumer = consumerCaptor.getValue + capturedConsumer.responseReceived(httpResponse) + capturedConsumer.responseCompleted(httpContext) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "records a zero for a status code on exception" in { + underTest.execute(httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpUriRequest), callbackCaptor.capture()) + callbackCaptor.getValue.failed(new RuntimeException("bad")) + + verify(underTest.tracer).record("http-response-code", 0L) + } + "wraps the HttpAsyncRequestProducer members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedProducer = producerCaptor.getValue + verifyZeroInteractions(requestProducer) + + val exception = mock[Exception] + capturedProducer.failed(exception) + verify(requestProducer).failed(argEq(exception)) + + val contentEncoder = mock[ContentEncoder] + val ioControl = mock[IOControl] + capturedProducer.produceContent(contentEncoder, ioControl) + verify(requestProducer).produceContent(argEq(contentEncoder), argEq(ioControl)) + + val context = mock[HttpContext] + capturedProducer.requestCompleted(context) + verify(requestProducer).requestCompleted(argEq(context)) + + capturedProducer.resetRequest() + verify(requestProducer).resetRequest() + + capturedProducer.generateRequest() + verify(requestProducer).generateRequest() + + capturedProducer.getTarget + verify(requestProducer).getTarget + + capturedProducer.isRepeatable + verify(requestProducer).isRepeatable + + capturedProducer.close() + verify(requestProducer).close() + } + "wraps the HttpAsyncResponseConsumer members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedConsumer = consumerCaptor.getValue + verifyZeroInteractions(responseConsumer) + + val contentDecoder = mock[ContentDecoder] + val ioControl = mock[IOControl] + capturedConsumer.consumeContent(contentDecoder, ioControl) + verify(responseConsumer).consumeContent(argEq(contentDecoder), argEq(ioControl)) + + val exception = mock[Exception] + capturedConsumer.failed(exception) + verify(responseConsumer).failed(argEq(exception)) + + val context = mock[HttpContext] + capturedConsumer.responseCompleted(context) + verify(responseConsumer).responseCompleted(argEq(context)) + + val response = mock[HttpResponse] + capturedConsumer.responseReceived(response) + verify(responseConsumer).responseReceived(argEq(response)) + + capturedConsumer.getException + verify(responseConsumer).getException + + capturedConsumer.getResult + verify(responseConsumer).getResult + + capturedConsumer.isDone + verify(responseConsumer).isDone + + capturedConsumer.close() + verify(responseConsumer).close() + } + "wraps the FutureCallback members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedCallback = callbackCaptor.getValue + verifyZeroInteractions(callback) + + capturedCallback.cancelled() + verify(callback).cancelled() + + val response = mock[HttpResponse] + capturedCallback.completed(response) + verify(callback).completed(argEq(response)) + + val exception = mock[Exception] + capturedCallback.failed(exception) + verify(callback).failed(argEq(exception)) + } + "calls close on closeable http client" in { + underTest.close() + verify(httpClient).close() + } + "calls close if the http client implements closable" in { + trait Closer extends HttpAsyncClient with Closeable + + val closeHttp = mock[Closer] + val closeTest = new TraceFriendlyHttpAsyncClient(closeHttp) { + override val tracer = mock[Tracer] + } + + closeTest.close() + verify(closeHttp).close() + } + "calls close if the http client implements auto closeable" in { + trait AutoCloser extends HttpAsyncClient with AutoCloseable + val autoCloseHttp = mock[AutoCloser] + val autoCloseTest = new TraceFriendlyHttpAsyncClient(autoCloseHttp) { + override val tracer = mock[Tracer] + } + + autoCloseTest.close() + verify(autoCloseHttp).close() + } + } +} diff --git a/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala b/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala index acc1e5a9..887902e2 100644 --- a/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala +++ b/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala @@ -122,12 +122,12 @@ class TraceFriendlyHttpClient(wrapee: HttpClient) extends HttpClient with java.i } override def close(): Unit = { - if (wrapee.isInstanceOf[CloseableHttpClient]) - wrapee.asInstanceOf[CloseableHttpClient].close() - else if (wrapee.isInstanceOf[Closeable]) - wrapee.asInstanceOf[Closeable].close() - else if (wrapee.isInstanceOf[AutoCloseable]) - wrapee.asInstanceOf[AutoCloseable].close() + wrapee match { + case client: CloseableHttpClient => client.close() + case closeable: Closeable => closeable.close() + case closeable: AutoCloseable => closeable.close() + case _ => + } } } diff --git a/project/MoneyBuild.scala b/project/MoneyBuild.scala index 6c3a3c54..20938bbe 100644 --- a/project/MoneyBuild.scala +++ b/project/MoneyBuild.scala @@ -26,7 +26,7 @@ object MoneyBuild extends Build { publishLocal := {}, publish := {} ) - .aggregate(moneyApi, moneyCore, moneyAspectj, moneyHttpClient, moneyJavaServlet, moneyKafka, moneySpring, moneySpring3, moneyWire) + .aggregate(moneyApi, moneyCore, moneyAspectj, moneyHttpClient, moneyHttpAsyncClient, moneyJavaServlet, moneyKafka, moneySpring, moneySpring3, moneyWire) lazy val moneyApi = Project("money-api", file("./money-api")) @@ -88,6 +88,21 @@ object MoneyBuild extends Build { ) .dependsOn(moneyCore % "test->test;compile->compile",moneyAspectj) + lazy val moneyHttpAsyncClient = + Project("money-http-async-client", file("./money-http-async-client")) + .configs( IntegrationTest ) + .settings(aspectjProjectSettings: _*) + .settings( + libraryDependencies <++= (scalaVersion){v: String => + Seq( + apacheHttpAsyncClient, + scalaTest, + mockito + ) + } + ) + .dependsOn(moneyCore % "test->test;compile->compile",moneyAspectj) + lazy val moneyJavaServlet = Project("money-java-servlet", file("./money-java-servlet")) .configs( IntegrationTest ) @@ -284,6 +299,7 @@ object MoneyBuild extends Build { object Dependencies { val codahaleVersion = "3.0.2" val apacheHttpClientVersion = "4.3.5" + val apacheHttpAsyncClientVersion = "4.1.3" // Logging val slf4j = "org.slf4j" % "slf4j-api" % "1.7.5" @@ -305,6 +321,9 @@ object MoneyBuild extends Build { // Apache http client val apacheHttpClient = "org.apache.httpcomponents" % "httpclient" % apacheHttpClientVersion + // Apache http async client + val apacheHttpAsyncClient = "org.apache.httpcomponents" % "httpasyncclient" % apacheHttpAsyncClientVersion + // Javax servlet - note: the group id and artfacit id have changed in 3.0 val javaxServlet = "javax.servlet" % "servlet-api" % "2.5"