From 72d92420ecc1a3f6cfed8a1d62055b41b3a976a8 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 13 Feb 2018 13:51:25 -0500 Subject: [PATCH 1/9] Initial support for Apache HttpAsyncClient --- .../TraceFriendlyThreadPoolExecutor.scala | 24 ++- .../money/core/state/RestoredState.scala | 21 +++ .../com/comcast/money/core/state/State.scala | 47 ++++++ .../http/client/HttpAsyncTraceConfig.scala | 8 + .../client/TraceFriendlyHttpAsyncClient.scala | 145 ++++++++++++++++++ .../http/client/TracingFutureCallback.scala | 18 +++ .../TracingHttpAsyncRequestProducer.scala | 24 +++ .../TracingHttpAsyncResponseConsumer.scala | 34 ++++ .../http/client/TraceFriendlyHttpClient.scala | 12 +- project/MoneyBuild.scala | 19 +++ 10 files changed, 332 insertions(+), 20 deletions(-) create mode 100644 money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala create mode 100644 money-core/src/main/scala/com/comcast/money/core/state/State.scala create mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala create mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala create mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala create mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala create mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala diff --git a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala index 11d0f1f7..26d6ae3c 100644 --- a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala +++ b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala @@ -20,6 +20,7 @@ import java.util.concurrent._ import com.comcast.money.core.internal.{ MDCSupport, SpanLocal } import com.comcast.money.core.logging.TraceLogging +import com.comcast.money.core.state.State import org.slf4j.MDC object TraceFriendlyThreadPoolExecutor { @@ -65,24 +66,19 @@ class TraceFriendlyThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, k } override def execute(command: Runnable) = { - val inherited = SpanLocal.current - val submittingThreadsContext = MDC.getCopyOfContextMap + val state = State.capture() super.execute( new Runnable { override def run = { - mdcSupport.propogateMDC(Option(submittingThreadsContext)) - SpanLocal.clear() - inherited.foreach(SpanLocal.push) - try { - command.run() - } catch { - case t: Throwable => - logException(t) - throw t - } finally { - SpanLocal.clear() - MDC.clear() + state.restore() { + try { + command.run() + } catch { + case t: Throwable => + logException(t) + throw t + } } } } diff --git a/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala new file mode 100644 index 00000000..231aa037 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.state + +trait RestoredState extends AutoCloseable { + override def close(): Unit +} diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala new file mode 100644 index 00000000..fc798e57 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.state + +import com.comcast.money.core.internal.{ MDCSupport, SpanLocal } +import org.slf4j.MDC + +object State { + private lazy val mdcSupport = new MDCSupport() + + def capture(): State = { + val span = SpanLocal.current + val mdc = MDC.getCopyOfContextMap + + new State { + override def restore()(f: => Unit): Unit = { + mdcSupport.propogateMDC(Option(mdc)) + SpanLocal.clear() + span.foreach(SpanLocal.push) + try { + f + } finally { + MDC.clear() + SpanLocal.clear() + } + } + } + } +} + +trait State { + def restore()(f: => Unit): Unit +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala new file mode 100644 index 00000000..e506afd7 --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala @@ -0,0 +1,8 @@ +package com.comcast.money.http.client + +object HttpAsyncTraceConfig { + lazy val HttpResponseTimeTraceKey: String = "http-call-duration" + lazy val HttpFullResponseTimeTraceKey: String = "http-call-with-body-duration" + lazy val ProcessResponseTimeTraceKey: String = "http-process-response-duration" + lazy val HttpResponseCodeTraceKey: String = "http-response-code" +} \ No newline at end of file diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala new file mode 100644 index 00000000..ca0de85f --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -0,0 +1,145 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.http.client + +import java.io.Closeable +import java.util.concurrent.{CancellationException, Future} + +import com.comcast.money.api.Span +import com.comcast.money.core.{Formatters, Money, Tracer} +import org.apache.http.protocol.HttpContext +import org.apache.http.{HttpHost, HttpRequest, HttpResponse} +import com.comcast.money.core.internal.SpanLocal +import com.comcast.money.core.state.State +import org.apache.http.client.methods.HttpUriRequest +import org.apache.http.concurrent.FutureCallback +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.client.HttpAsyncClient +import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} + +import scala.util.Try + +object TraceFriendlyHttpAsyncSupport { + def wrapExecute(httpRequest: HttpRequest, callback: FutureCallback[HttpResponse], tracer: Tracer)(f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { + val state = State.capture() + + TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), SpanLocal.current) + + val cb = new TracingFutureHttpResponseCallback(Option(callback), state, (response: Try[HttpResponse]) => { + tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + val responseCode = getResponseCode(response) + tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) + }) + + tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + f(cb) + } + + def wrapExecute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], callback: FutureCallback[T], tracer: Tracer)(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { + val state = State.capture() + val span = SpanLocal.current + val p = new TracingHttpAsyncRequestProducer(requestProducer, (httpRequest: HttpRequest) => { + TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), span) + tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + httpRequest + }) + val c = new TracingHttpAsyncResponseConsumer[T](responseConsumer, (httpResponse: Try[HttpResponse]) => { + tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + val responseCode = getResponseCode(httpResponse) + tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) + httpResponse + }) + val cb = new TracingFutureCallback[T](Option(callback), state) + f(p, c, cb) + } + + def addTraceHeader(httpRequest: Option[HttpRequest], currentSpan: Option[Span]) { + (httpRequest, currentSpan) match { + case (Some(r), Some(s)) => r.setHeader("X-MoneyTrace", Formatters.toHttpHeader(s.info.id)) + } + } + + def getResponseCode(response: Try[HttpResponse]): Int = { + response map(_.getStatusLine) map(_.getStatusCode) getOrElse 0 + } +} + +class TraceFriendlyHttpAsyncClient(wrappee : HttpAsyncClient) extends HttpAsyncClient with java.io.Closeable { + import com.comcast.money.http.client.TraceFriendlyHttpAsyncSupport._ + + private val tracer = Money.Environment.tracer + + override def execute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], context: HttpContext, callback: FutureCallback[T]): Future[T] = + wrapExecute(requestProducer, responseConsumer, callback, tracer) { + (p, c, cb) => wrappee.execute(p, c, context, cb) + } + + override def execute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], callback: FutureCallback[T]): Future[T] = + wrapExecute(requestProducer, responseConsumer, callback, tracer) { + (p, c, cb) => wrappee.execute(p, c, cb) + } + + override def execute(target: HttpHost, request: HttpRequest, context: HttpContext, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + cb => wrappee.execute(target, request, context, cb) + } + + override def execute(target: HttpHost, request: HttpRequest, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + cb => wrappee.execute(target, request, cb) + } + + override def execute(request: HttpUriRequest, context: HttpContext, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + cb => wrappee.execute(request, context, cb) + } + + override def execute(request: HttpUriRequest, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + wrapExecute(request, callback, tracer) { + cb => wrappee.execute(request, cb) + } + + override def close(): Unit = { + wrappee match { + case closeable: CloseableHttpAsyncClient => + closeable.close() + case closeable: Closeable => + closeable.close() + case closeable: AutoCloseable => + closeable.close() + case _ => + } + } +} + +class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpResponse]], state: State, f: Try[HttpResponse] => Unit) extends FutureCallback[HttpResponse] { + override def failed(ex: Exception): Unit = state.restore() { + f(Try(ex)) + wrappee.foreach(_.failed(ex)) + } + + override def completed(result: HttpResponse): Unit = state.restore() { + f(Try(result)) + wrappee.foreach(_.completed(result)) + } + + override def cancelled(): Unit = state.restore() { + f(Try(new CancellationException())) + wrappee.foreach(_.cancelled()) + } +} \ No newline at end of file diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala new file mode 100644 index 00000000..4d2fd2bc --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala @@ -0,0 +1,18 @@ +package com.comcast.money.http.client + +import com.comcast.money.core.state.State +import org.apache.http.concurrent.FutureCallback + +class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State) extends FutureCallback[T] { + override def failed(ex: Exception): Unit = state.restore() { + wrappee.foreach(_.failed(ex)) + } + + override def completed(result: T): Unit = state.restore() { + wrappee.foreach(_.completed(result)) + } + + override def cancelled(): Unit = state.restore() { + wrappee.foreach(_.cancelled()) + } +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala new file mode 100644 index 00000000..44d3752e --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala @@ -0,0 +1,24 @@ +package com.comcast.money.http.client + +import org.apache.http.{HttpHost, HttpRequest} +import org.apache.http.nio.{ContentEncoder, IOControl} +import org.apache.http.nio.protocol.HttpAsyncRequestProducer +import org.apache.http.protocol.HttpContext + +class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { + override def resetRequest(): Unit = wrappee.resetRequest() + + override def requestCompleted(context: HttpContext): Unit = wrappee.requestCompleted(context) + + override def failed(ex: Exception): Unit = wrappee.failed(ex) + + override def generateRequest(): HttpRequest = f(wrappee.generateRequest()) + + override def isRepeatable: Boolean = wrappee.isRepeatable + + override def getTarget: HttpHost = wrappee.getTarget + + override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) + + override def close(): Unit = wrappee.close() +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala new file mode 100644 index 00000000..6bbabf30 --- /dev/null +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala @@ -0,0 +1,34 @@ +package com.comcast.money.http.client + +import org.apache.http.HttpResponse +import org.apache.http.nio.{ContentDecoder, IOControl} +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer +import org.apache.http.protocol.HttpContext + +import scala.util.Try + +class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { + override def getException: Exception = wrappee.getException + + override def failed(ex: Exception): Unit = { + f(Try(ex)) + wrappee.failed(ex) + } + + override def responseReceived(response: HttpResponse): Unit = { + f(Try(response)) + wrappee.responseReceived(response) + } + + override def isDone: Boolean = wrappee.isDone + + override def responseCompleted(context: HttpContext): Unit = wrappee.responseCompleted(context) + + override def getResult: T = wrappee.getResult + + override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) + + override def cancel(): Boolean = wrappee.cancel() + + override def close(): Unit = wrappee.close() +} \ No newline at end of file diff --git a/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala b/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala index acc1e5a9..887902e2 100644 --- a/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala +++ b/money-http-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpClient.scala @@ -122,12 +122,12 @@ class TraceFriendlyHttpClient(wrapee: HttpClient) extends HttpClient with java.i } override def close(): Unit = { - if (wrapee.isInstanceOf[CloseableHttpClient]) - wrapee.asInstanceOf[CloseableHttpClient].close() - else if (wrapee.isInstanceOf[Closeable]) - wrapee.asInstanceOf[Closeable].close() - else if (wrapee.isInstanceOf[AutoCloseable]) - wrapee.asInstanceOf[AutoCloseable].close() + wrapee match { + case client: CloseableHttpClient => client.close() + case closeable: Closeable => closeable.close() + case closeable: AutoCloseable => closeable.close() + case _ => + } } } diff --git a/project/MoneyBuild.scala b/project/MoneyBuild.scala index 6c3a3c54..3a664d2f 100644 --- a/project/MoneyBuild.scala +++ b/project/MoneyBuild.scala @@ -88,6 +88,21 @@ object MoneyBuild extends Build { ) .dependsOn(moneyCore % "test->test;compile->compile",moneyAspectj) + lazy val moneyHttpAsyncClient = + Project("money-http-async-client", file("./money-http-async-client")) + .configs( IntegrationTest ) + .settings(aspectjProjectSettings: _*) + .settings( + libraryDependencies <++= (scalaVersion){v: String => + Seq( + apacheHttpAsyncClient, + scalaTest, + mockito + ) + } + ) + .dependsOn(moneyCore % "test->test;compile->compile",moneyAspectj) + lazy val moneyJavaServlet = Project("money-java-servlet", file("./money-java-servlet")) .configs( IntegrationTest ) @@ -284,6 +299,7 @@ object MoneyBuild extends Build { object Dependencies { val codahaleVersion = "3.0.2" val apacheHttpClientVersion = "4.3.5" + val apacheHttpAsyncClientVersion = "4.1.3" // Logging val slf4j = "org.slf4j" % "slf4j-api" % "1.7.5" @@ -305,6 +321,9 @@ object MoneyBuild extends Build { // Apache http client val apacheHttpClient = "org.apache.httpcomponents" % "httpclient" % apacheHttpClientVersion + // Apache http async client + val apacheHttpAsyncClient = "org.apache.httpcomponents" % "httpasyncclient" % apacheHttpAsyncClientVersion + // Javax servlet - note: the group id and artfacit id have changed in 3.0 val javaxServlet = "javax.servlet" % "servlet-api" % "2.5" From 42b8d9c3cb318d8fd9a9ca9b0a5c19f6d74aa4b8 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 13 Feb 2018 15:00:56 -0500 Subject: [PATCH 2/9] Fix State and restore --- .../com/comcast/money/core/state/State.scala | 27 +++++++------ .../client/TraceFriendlyHttpAsyncClient.scala | 39 ++++++++++++++----- .../http/client/TracingFutureCallback.scala | 6 +-- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala index fc798e57..9e575045 100644 --- a/money-core/src/main/scala/com/comcast/money/core/state/State.scala +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -26,22 +26,27 @@ object State { val span = SpanLocal.current val mdc = MDC.getCopyOfContextMap - new State { - override def restore()(f: => Unit): Unit = { - mdcSupport.propogateMDC(Option(mdc)) + () => { + mdcSupport.propogateMDC(Option(mdc)) + SpanLocal.clear() + span.foreach(SpanLocal.push) + () => { + MDC.clear() SpanLocal.clear() - span.foreach(SpanLocal.push) - try { - f - } finally { - MDC.clear() - SpanLocal.clear() - } } } } } trait State { - def restore()(f: => Unit): Unit + def restore(): RestoredState + + def restore(f: => Unit): Unit = { + val restoredState = restore() + try { + f + } finally { + if (restoredState != null) restoredState.close() + } + } } diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala index ca0de85f..1db1398e 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -34,7 +34,10 @@ import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponse import scala.util.Try object TraceFriendlyHttpAsyncSupport { - def wrapExecute(httpRequest: HttpRequest, callback: FutureCallback[HttpResponse], tracer: Tracer)(f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { + def wrapExecute(httpRequest: HttpRequest, + callback: FutureCallback[HttpResponse], + tracer: Tracer) + (f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { val state = State.capture() TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), SpanLocal.current) @@ -50,7 +53,11 @@ object TraceFriendlyHttpAsyncSupport { f(cb) } - def wrapExecute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], callback: FutureCallback[T], tracer: Tracer)(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { + def wrapExecute[T](requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T], + tracer: Tracer) + (f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { val state = State.capture() val span = SpanLocal.current val p = new TracingHttpAsyncRequestProducer(requestProducer, (httpRequest: HttpRequest) => { @@ -84,27 +91,39 @@ class TraceFriendlyHttpAsyncClient(wrappee : HttpAsyncClient) extends HttpAsyncC private val tracer = Money.Environment.tracer - override def execute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], context: HttpContext, callback: FutureCallback[T]): Future[T] = + override def execute[T](requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + context: HttpContext, + callback: FutureCallback[T]): Future[T] = wrapExecute(requestProducer, responseConsumer, callback, tracer) { (p, c, cb) => wrappee.execute(p, c, context, cb) } - override def execute[T](requestProducer: HttpAsyncRequestProducer, responseConsumer: HttpAsyncResponseConsumer[T], callback: FutureCallback[T]): Future[T] = + override def execute[T](requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T]): Future[T] = wrapExecute(requestProducer, responseConsumer, callback, tracer) { (p, c, cb) => wrappee.execute(p, c, cb) } - override def execute(target: HttpHost, request: HttpRequest, context: HttpContext, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute(target: HttpHost, + request: HttpRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse]): Future[HttpResponse] = wrapExecute(request, callback, tracer) { cb => wrappee.execute(target, request, context, cb) } - override def execute(target: HttpHost, request: HttpRequest, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute(target: HttpHost, + request: HttpRequest, + callback: FutureCallback[HttpResponse]): Future[HttpResponse] = wrapExecute(request, callback, tracer) { cb => wrappee.execute(target, request, cb) } - override def execute(request: HttpUriRequest, context: HttpContext, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute(request: HttpUriRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse]): Future[HttpResponse] = wrapExecute(request, callback, tracer) { cb => wrappee.execute(request, context, cb) } @@ -128,17 +147,17 @@ class TraceFriendlyHttpAsyncClient(wrappee : HttpAsyncClient) extends HttpAsyncC } class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpResponse]], state: State, f: Try[HttpResponse] => Unit) extends FutureCallback[HttpResponse] { - override def failed(ex: Exception): Unit = state.restore() { + override def failed(ex: Exception): Unit = state.restore { f(Try(ex)) wrappee.foreach(_.failed(ex)) } - override def completed(result: HttpResponse): Unit = state.restore() { + override def completed(result: HttpResponse): Unit = state.restore { f(Try(result)) wrappee.foreach(_.completed(result)) } - override def cancelled(): Unit = state.restore() { + override def cancelled(): Unit = state.restore { f(Try(new CancellationException())) wrappee.foreach(_.cancelled()) } diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala index 4d2fd2bc..b8474527 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala @@ -4,15 +4,15 @@ import com.comcast.money.core.state.State import org.apache.http.concurrent.FutureCallback class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State) extends FutureCallback[T] { - override def failed(ex: Exception): Unit = state.restore() { + override def failed(ex: Exception): Unit = state.restore { wrappee.foreach(_.failed(ex)) } - override def completed(result: T): Unit = state.restore() { + override def completed(result: T): Unit = state.restore { wrappee.foreach(_.completed(result)) } - override def cancelled(): Unit = state.restore() { + override def cancelled(): Unit = state.restore { wrappee.foreach(_.cancelled()) } } From 449b0a6f1d73ecf2dae83e8ede8e5ee735b96ef3 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 13 Feb 2018 15:03:56 -0500 Subject: [PATCH 3/9] Allow restore to return a value --- .../src/main/scala/com/comcast/money/core/state/State.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala index 9e575045..f7062bf1 100644 --- a/money-core/src/main/scala/com/comcast/money/core/state/State.scala +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -41,7 +41,7 @@ object State { trait State { def restore(): RestoredState - def restore(f: => Unit): Unit = { + def restore[T](f: => T): T = { val restoredState = restore() try { f From 30ad77dd448816a7378120d6726603f3fa2c62e2 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 26 Feb 2018 14:54:38 -0500 Subject: [PATCH 4/9] Fix Scala compilation errors (wrong version) and consolidate implementation --- .../TraceFriendlyThreadPoolExecutor.scala | 24 +-- .../com/comcast/money/core/state/State.scala | 16 +- .../src/main/resources/reference.conf | 24 +++ .../http/client/HttpAsyncTraceConfig.scala | 18 +- .../client/TraceFriendlyHttpAsyncClient.scala | 178 +++++++++++++----- .../http/client/TracingFutureCallback.scala | 18 -- .../TracingHttpAsyncRequestProducer.scala | 24 --- .../TracingHttpAsyncResponseConsumer.scala | 34 ---- project/MoneyBuild.scala | 2 +- 9 files changed, 190 insertions(+), 148 deletions(-) create mode 100644 money-http-async-client/src/main/resources/reference.conf delete mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala delete mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala delete mode 100644 money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala diff --git a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala index 26d6ae3c..358d4112 100644 --- a/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala +++ b/money-core/src/main/scala/com/comcast/money/core/concurrent/TraceFriendlyThreadPoolExecutor.scala @@ -65,23 +65,19 @@ class TraceFriendlyThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, k setRejectedExecutionHandler(rejectedExecutionHandler) } - override def execute(command: Runnable) = { + override def execute(command: Runnable) { val state = State.capture() - super.execute( - new Runnable { - override def run = { - state.restore() { - try { - command.run() - } catch { - case t: Throwable => - logException(t) - throw t - } - } + super.execute(new Runnable { + override def run(): Unit = state.restore { + try { + command.run() + } catch { + case t: Throwable => + logException(t) + throw t } } - ) + }) } } diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala index f7062bf1..9821afc0 100644 --- a/money-core/src/main/scala/com/comcast/money/core/state/State.scala +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -26,13 +26,17 @@ object State { val span = SpanLocal.current val mdc = MDC.getCopyOfContextMap - () => { - mdcSupport.propogateMDC(Option(mdc)) - SpanLocal.clear() - span.foreach(SpanLocal.push) - () => { - MDC.clear() + new State { + override def restore(): RestoredState = { + mdcSupport.propogateMDC(Option(mdc)) SpanLocal.clear() + span.foreach(SpanLocal.push) + new RestoredState { + override def close(): Unit = { + MDC.clear() + SpanLocal.clear() + } + } } } } diff --git a/money-http-async-client/src/main/resources/reference.conf b/money-http-async-client/src/main/resources/reference.conf new file mode 100644 index 00000000..2622253c --- /dev/null +++ b/money-http-async-client/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +# Copyright 2012-2015 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +money { + http-client { + metric-names { + http-call-duration = "http-call-duration" + http-call-with-body-duration = "http-call-with-body-duration" + http-process-response-duration = "http-process-response-duration" + http-response-code = "http-response-code" + } + } +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala index e506afd7..3db8f7ed 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/HttpAsyncTraceConfig.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.comcast.money.http.client object HttpAsyncTraceConfig { @@ -5,4 +21,4 @@ object HttpAsyncTraceConfig { lazy val HttpFullResponseTimeTraceKey: String = "http-call-with-body-duration" lazy val ProcessResponseTimeTraceKey: String = "http-process-response-duration" lazy val HttpResponseCodeTraceKey: String = "http-response-code" -} \ No newline at end of file +} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala index 1db1398e..6ba729eb 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -17,32 +17,34 @@ package com.comcast.money.http.client import java.io.Closeable -import java.util.concurrent.{CancellationException, Future} +import java.util.concurrent.{ CancellationException, Future } import com.comcast.money.api.Span -import com.comcast.money.core.{Formatters, Money, Tracer} +import com.comcast.money.core.{ Formatters, Money, Tracer } import org.apache.http.protocol.HttpContext -import org.apache.http.{HttpHost, HttpRequest, HttpResponse} +import org.apache.http.{ HttpHost, HttpRequest, HttpResponse } import com.comcast.money.core.internal.SpanLocal import com.comcast.money.core.state.State import org.apache.http.client.methods.HttpUriRequest import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.{ ContentDecoder, ContentEncoder, IOControl } import org.apache.http.nio.client.HttpAsyncClient -import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} +import org.apache.http.nio.protocol.{ HttpAsyncRequestProducer, HttpAsyncResponseConsumer } -import scala.util.Try +import scala.util.{ Failure, Try } object TraceFriendlyHttpAsyncSupport { - def wrapExecute(httpRequest: HttpRequest, - callback: FutureCallback[HttpResponse], - tracer: Tracer) - (f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { + def wrapExecute( + httpRequest: HttpRequest, + callback: FutureCallback[HttpResponse], + tracer: Tracer + )(f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { val state = State.capture() TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), SpanLocal.current) - val cb = new TracingFutureHttpResponseCallback(Option(callback), state, (response: Try[HttpResponse]) => { + val tracingCallback = new TracingFutureHttpResponseCallback(Option(callback), state, (response: Try[HttpResponse]) => { tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) val responseCode = getResponseCode(response) tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) @@ -50,87 +52,103 @@ object TraceFriendlyHttpAsyncSupport { tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) - f(cb) + f(tracingCallback) } - def wrapExecute[T](requestProducer: HttpAsyncRequestProducer, - responseConsumer: HttpAsyncResponseConsumer[T], - callback: FutureCallback[T], - tracer: Tracer) - (f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { + def wrapExecute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T], + tracer: Tracer + )(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { val state = State.capture() val span = SpanLocal.current - val p = new TracingHttpAsyncRequestProducer(requestProducer, (httpRequest: HttpRequest) => { + val tracingRequestProducer = new TracingHttpAsyncRequestProducer(requestProducer, (httpRequest: HttpRequest) => { TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), span) tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) httpRequest }) - val c = new TracingHttpAsyncResponseConsumer[T](responseConsumer, (httpResponse: Try[HttpResponse]) => { + val tracingResponseConsumer = new TracingHttpAsyncResponseConsumer[T](responseConsumer, (httpResponse: Try[HttpResponse]) => { tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) val responseCode = getResponseCode(httpResponse) tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) httpResponse }) - val cb = new TracingFutureCallback[T](Option(callback), state) - f(p, c, cb) + val tracingCallback = new TracingFutureCallback[T](Option(callback), state) + f(tracingRequestProducer, tracingResponseConsumer, tracingCallback) } def addTraceHeader(httpRequest: Option[HttpRequest], currentSpan: Option[Span]) { (httpRequest, currentSpan) match { - case (Some(r), Some(s)) => r.setHeader("X-MoneyTrace", Formatters.toHttpHeader(s.info.id)) + case (Some(request), Some(span)) => request.setHeader("X-MoneyTrace", Formatters.toHttpHeader(span.info.id)) } } def getResponseCode(response: Try[HttpResponse]): Int = { - response map(_.getStatusLine) map(_.getStatusCode) getOrElse 0 + response map (_.getStatusLine) map (_.getStatusCode) getOrElse 0 } } -class TraceFriendlyHttpAsyncClient(wrappee : HttpAsyncClient) extends HttpAsyncClient with java.io.Closeable { +class TraceFriendlyHttpAsyncClient(wrappee: HttpAsyncClient) extends HttpAsyncClient + with java.io.Closeable { + import com.comcast.money.http.client.TraceFriendlyHttpAsyncSupport._ - private val tracer = Money.Environment.tracer + val tracer = Money.Environment.tracer - override def execute[T](requestProducer: HttpAsyncRequestProducer, - responseConsumer: HttpAsyncResponseConsumer[T], - context: HttpContext, - callback: FutureCallback[T]): Future[T] = + override def execute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + context: HttpContext, + callback: FutureCallback[T] + ): Future[T] = wrapExecute(requestProducer, responseConsumer, callback, tracer) { - (p, c, cb) => wrappee.execute(p, c, context, cb) + (tracingRequestProducer, tracingResponseConsumer, tracingCallback) => wrappee.execute(tracingRequestProducer, tracingResponseConsumer, context, tracingCallback) } - override def execute[T](requestProducer: HttpAsyncRequestProducer, - responseConsumer: HttpAsyncResponseConsumer[T], - callback: FutureCallback[T]): Future[T] = + override def execute[T]( + requestProducer: HttpAsyncRequestProducer, + responseConsumer: HttpAsyncResponseConsumer[T], + callback: FutureCallback[T] + ): Future[T] = wrapExecute(requestProducer, responseConsumer, callback, tracer) { - (p, c, cb) => wrappee.execute(p, c, cb) + (tracingRequestProducer, tracingResponseConsumer, tracingCallback) => wrappee.execute(tracingRequestProducer, tracingResponseConsumer, tracingCallback) } - override def execute(target: HttpHost, - request: HttpRequest, - context: HttpContext, - callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute( + target: HttpHost, + request: HttpRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = wrapExecute(request, callback, tracer) { - cb => wrappee.execute(target, request, context, cb) + tracingCallback => wrappee.execute(target, request, context, tracingCallback) } - override def execute(target: HttpHost, - request: HttpRequest, - callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute( + target: HttpHost, + request: HttpRequest, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = wrapExecute(request, callback, tracer) { - cb => wrappee.execute(target, request, cb) + tracingCallback => wrappee.execute(target, request, tracingCallback) } - override def execute(request: HttpUriRequest, - context: HttpContext, - callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute( + request: HttpUriRequest, + context: HttpContext, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = wrapExecute(request, callback, tracer) { - cb => wrappee.execute(request, context, cb) + tracingCallback => wrappee.execute(request, context, tracingCallback) } - override def execute(request: HttpUriRequest, callback: FutureCallback[HttpResponse]): Future[HttpResponse] = + override def execute( + request: HttpUriRequest, + callback: FutureCallback[HttpResponse] + ): Future[HttpResponse] = wrapExecute(request, callback, tracer) { - cb => wrappee.execute(request, cb) + tracingCallback => wrappee.execute(request, tracingCallback) } override def close(): Unit = { @@ -146,9 +164,25 @@ class TraceFriendlyHttpAsyncClient(wrappee : HttpAsyncClient) extends HttpAsyncC } } +class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State) extends FutureCallback[T] { + override def failed(ex: Exception): Unit = state.restore { + wrappee.foreach { _.failed(ex) } + } + + override def completed(result: T): Unit = state.restore { + wrappee.foreach { + _.completed(result) + } + } + + override def cancelled(): Unit = state.restore { + wrappee.foreach { _.cancelled() } + } +} + class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpResponse]], state: State, f: Try[HttpResponse] => Unit) extends FutureCallback[HttpResponse] { override def failed(ex: Exception): Unit = state.restore { - f(Try(ex)) + f(Failure(ex)) wrappee.foreach(_.failed(ex)) } @@ -158,7 +192,51 @@ class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpRespo } override def cancelled(): Unit = state.restore { - f(Try(new CancellationException())) + f(Failure(new CancellationException())) wrappee.foreach(_.cancelled()) } +} + +class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { + override def resetRequest(): Unit = wrappee.resetRequest() + + override def requestCompleted(context: HttpContext): Unit = wrappee.requestCompleted(context) + + override def failed(ex: Exception): Unit = wrappee.failed(ex) + + override def generateRequest(): HttpRequest = f(wrappee.generateRequest()) + + override def isRepeatable: Boolean = wrappee.isRepeatable + + override def getTarget: HttpHost = wrappee.getTarget + + override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) + + override def close(): Unit = wrappee.close() +} + +class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { + override def getException: Exception = wrappee.getException + + override def failed(ex: Exception): Unit = { + f(Failure(ex)) + wrappee.failed(ex) + } + + override def responseReceived(response: HttpResponse): Unit = { + f(Try(response)) + wrappee.responseReceived(response) + } + + override def isDone: Boolean = wrappee.isDone + + override def responseCompleted(context: HttpContext): Unit = wrappee.responseCompleted(context) + + override def getResult: T = wrappee.getResult + + override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) + + override def cancel(): Boolean = wrappee.cancel() + + override def close(): Unit = wrappee.close() } \ No newline at end of file diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala deleted file mode 100644 index b8474527..00000000 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingFutureCallback.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.comcast.money.http.client - -import com.comcast.money.core.state.State -import org.apache.http.concurrent.FutureCallback - -class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State) extends FutureCallback[T] { - override def failed(ex: Exception): Unit = state.restore { - wrappee.foreach(_.failed(ex)) - } - - override def completed(result: T): Unit = state.restore { - wrappee.foreach(_.completed(result)) - } - - override def cancelled(): Unit = state.restore { - wrappee.foreach(_.cancelled()) - } -} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala deleted file mode 100644 index 44d3752e..00000000 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncRequestProducer.scala +++ /dev/null @@ -1,24 +0,0 @@ -package com.comcast.money.http.client - -import org.apache.http.{HttpHost, HttpRequest} -import org.apache.http.nio.{ContentEncoder, IOControl} -import org.apache.http.nio.protocol.HttpAsyncRequestProducer -import org.apache.http.protocol.HttpContext - -class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { - override def resetRequest(): Unit = wrappee.resetRequest() - - override def requestCompleted(context: HttpContext): Unit = wrappee.requestCompleted(context) - - override def failed(ex: Exception): Unit = wrappee.failed(ex) - - override def generateRequest(): HttpRequest = f(wrappee.generateRequest()) - - override def isRepeatable: Boolean = wrappee.isRepeatable - - override def getTarget: HttpHost = wrappee.getTarget - - override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) - - override def close(): Unit = wrappee.close() -} diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala deleted file mode 100644 index 6bbabf30..00000000 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TracingHttpAsyncResponseConsumer.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.comcast.money.http.client - -import org.apache.http.HttpResponse -import org.apache.http.nio.{ContentDecoder, IOControl} -import org.apache.http.nio.protocol.HttpAsyncResponseConsumer -import org.apache.http.protocol.HttpContext - -import scala.util.Try - -class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { - override def getException: Exception = wrappee.getException - - override def failed(ex: Exception): Unit = { - f(Try(ex)) - wrappee.failed(ex) - } - - override def responseReceived(response: HttpResponse): Unit = { - f(Try(response)) - wrappee.responseReceived(response) - } - - override def isDone: Boolean = wrappee.isDone - - override def responseCompleted(context: HttpContext): Unit = wrappee.responseCompleted(context) - - override def getResult: T = wrappee.getResult - - override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) - - override def cancel(): Boolean = wrappee.cancel() - - override def close(): Unit = wrappee.close() -} \ No newline at end of file diff --git a/project/MoneyBuild.scala b/project/MoneyBuild.scala index 3a664d2f..20938bbe 100644 --- a/project/MoneyBuild.scala +++ b/project/MoneyBuild.scala @@ -26,7 +26,7 @@ object MoneyBuild extends Build { publishLocal := {}, publish := {} ) - .aggregate(moneyApi, moneyCore, moneyAspectj, moneyHttpClient, moneyJavaServlet, moneyKafka, moneySpring, moneySpring3, moneyWire) + .aggregate(moneyApi, moneyCore, moneyAspectj, moneyHttpClient, moneyHttpAsyncClient, moneyJavaServlet, moneyKafka, moneySpring, moneySpring3, moneyWire) lazy val moneyApi = Project("money-api", file("./money-api")) From 06791f6715a1bf620fd85928a5f9f5c0ef597399 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 26 Feb 2018 14:55:03 -0500 Subject: [PATCH 5/9] Add unit tests for TraceFriendlyHttpAsyncClient --- .../src/test/resources/application.conf | 47 ++++ .../TraceFriendlyHttpAsyncClientSpec.scala | 208 ++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 money-http-async-client/src/test/resources/application.conf create mode 100644 money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala diff --git a/money-http-async-client/src/test/resources/application.conf b/money-http-async-client/src/test/resources/application.conf new file mode 100644 index 00000000..d2f1780b --- /dev/null +++ b/money-http-async-client/src/test/resources/application.conf @@ -0,0 +1,47 @@ +# Copyright 2012-2015 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +money { + enabled = true + mdc.enabled = true + application-name = "unknown" + log-exceptions = false + + handling = { + async = true + handlers = [ + { + class = "com.comcast.money.core.LogRecorderSpanHandler" + log-level = "INFO" + formatting { + span-start = "Span: " + null-value = "NULL" + log-template = "[ %s=%s ]" + span-duration-ms-enabled = "false" + keys { + span-id = "span-id" + trace-id = "trace-id" + parent-id = "parent-id" + span-name = "span-name" + app-name = "app-name" + start-time = "start-time" + span-duration = "span-duration" + span-duration-ms = "span-duration-ms" + span-success = "span-success" + } + } + } + ] + } +} diff --git a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala new file mode 100644 index 00000000..49b3c8f7 --- /dev/null +++ b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala @@ -0,0 +1,208 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.http.client + +import java.io.Closeable +import java.util.concurrent.{ CompletableFuture, Executors, Future, TimeUnit } + +import com.comcast.money.api.SpanId +import com.comcast.money.core.{ SpecHelpers, Tracer, Formatters => CoreSpanId } +import com.comcast.money.core.internal.SpanLocal +import org.apache.http.client.methods.{ CloseableHttpResponse, HttpUriRequest } +import org.apache.http.concurrent.FutureCallback +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.client.HttpAsyncClient +import org.apache.http.nio.protocol.{ HttpAsyncRequestProducer, HttpAsyncResponseConsumer } +import org.apache.http.protocol.HttpContext +import org.apache.http.{ HttpHost, HttpResponse, StatusLine } +import org.mockito.ArgumentCaptor +import org.mockito.Mockito._ +import org.mockito.Matchers.{ eq => argEq } +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +class TraceFriendlyHttpAsyncClientSpec extends WordSpec with SpecHelpers + with Matchers with MockitoSugar with OneInstancePerTest with BeforeAndAfterEach { + + val httpClient = mock[CloseableHttpAsyncClient] + val httpUriRequest = mock[HttpUriRequest] + val httpResponse = mock[CloseableHttpResponse] + val statusLine = mock[StatusLine] + val httpHost = new HttpHost("localhost") + val httpContext = mock[HttpContext] + val callback = mock[FutureCallback[HttpResponse]] + val requestProducer = mock[HttpAsyncRequestProducer] + val responseConsumer = mock[HttpAsyncResponseConsumer[HttpResponse]] + val spanId = new SpanId() + val future = new CompletableFuture[HttpResponse]() + + val callbackCaptor = ArgumentCaptor.forClass(classOf[FutureCallback[HttpResponse]]) + val producerCaptor = ArgumentCaptor.forClass(classOf[HttpAsyncRequestProducer]) + val consumerCaptor = ArgumentCaptor.forClass(classOf[HttpAsyncResponseConsumer[HttpResponse]]) + + val executor = Executors.newScheduledThreadPool(1) + + var callbackSpanId: Option[SpanId] = None + + when(httpResponse.getStatusLine).thenReturn(statusLine) + when(statusLine.getStatusCode).thenReturn(200) + + // extend what we are testing so we can use a mock tracer + val underTest = new TraceFriendlyHttpAsyncClient(httpClient) { + override val tracer = mock[Tracer] + } + + override def beforeEach(): Unit = { + SpanLocal.push(testSpan(spanId)) + doReturn(httpUriRequest).when(requestProducer).generateRequest() + doReturn(httpResponse).when(responseConsumer).getResult + + doAnswer(new Answer[Void] { + override def answer(invocation: InvocationOnMock): Void = { + // copy span ID of current thread to field to match in test + callbackSpanId = SpanLocal.current.map(_.info.id) + null + } + }).when(callback).completed(httpResponse) + } + + // if you don't reset, then the verifies are going to be off + override def afterEach() = { + reset(underTest.tracer, httpUriRequest, httpClient) + SpanLocal.clear() + } + + def verifyTracing() = { + verify(underTest.tracer).startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + verify(underTest.tracer).stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + verify(underTest.tracer).record("http-response-code", 200L) + verify(httpUriRequest).setHeader("X-MoneyTrace", CoreSpanId.toHttpHeader(spanId)) + assert(callbackSpanId.contains(spanId)) + } + + def completeCallback(): Unit = { + val callback = callbackCaptor.getValue + val runnable = new Runnable { + override def run(): Unit = { + callback.completed(httpResponse) + future.complete(httpResponse) + } + } + executor.schedule(runnable, 50, TimeUnit.MILLISECONDS) + future.get() + } + + "TraceFriendlyHttpAsyncClient" should { + "record the status code and call duration when execute(HttpUriRequest, FutureCallback)" in { + underTest.execute(httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpUriRequest), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpUriRequest, HttpContext, FutureCallback)" in { + underTest.execute(httpUriRequest, httpContext, callback) + + verify(httpClient).execute(argEq(httpUriRequest), argEq(httpContext), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpHost, HttpRequest, FutureCallback)" in { + underTest.execute(httpHost, httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpHost), argEq(httpUriRequest), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpHost, HttpRequest, HttpContext, FutureCallback)" in { + underTest.execute(httpHost, httpUriRequest, httpContext, callback) + + verify(httpClient).execute(argEq(httpHost), argEq(httpUriRequest), argEq(httpContext), callbackCaptor.capture()) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, FutureCallback)" in { + underTest.execute(requestProducer, responseConsumer, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), callbackCaptor.capture()) + producerCaptor.getValue.generateRequest() + val captured = consumerCaptor.getValue + captured.responseReceived(httpResponse) + captured.responseCompleted(httpContext) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "record the status code and call duration when execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, HttpContext, FutureCallback)" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + producerCaptor.getValue.generateRequest() + val capturedConsumer = consumerCaptor.getValue + capturedConsumer.responseReceived(httpResponse) + capturedConsumer.responseCompleted(httpContext) + completeCallback() + + verify(callback).completed(httpResponse) + verifyTracing() + } + "records a zero for a status code on exception" in { + underTest.execute(httpUriRequest, callback) + + verify(httpClient).execute(argEq(httpUriRequest), callbackCaptor.capture()) + callbackCaptor.getValue.failed(new RuntimeException("bad")) + + verify(underTest.tracer).record("http-response-code", 0L) + } + "calls close on closeable http client" in { + underTest.close() + verify(httpClient).close() + } + "calls close if the http client implements closable" in { + trait Closer extends HttpAsyncClient with Closeable + + val closeHttp = mock[Closer] + val closeTest = new TraceFriendlyHttpAsyncClient(closeHttp) { + override val tracer = mock[Tracer] + } + + closeTest.close() + verify(closeHttp).close() + } + "calls close if the http client implements auto closeable" in { + trait AutoCloser extends HttpAsyncClient with AutoCloseable + val autoCloseHttp = mock[AutoCloser] + val autoCloseTest = new TraceFriendlyHttpAsyncClient(autoCloseHttp) { + override val tracer = mock[Tracer] + } + + autoCloseTest.close() + verify(autoCloseHttp).close() + } + } +} From b1df0aed8fb7b4589ae9a1531dca2a0062620f65 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 26 Feb 2018 15:04:24 -0500 Subject: [PATCH 6/9] Propagate span onto request producer and response consumer threads --- .../client/TraceFriendlyHttpAsyncClient.scala | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala index 6ba729eb..1dd629d5 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -63,12 +63,12 @@ object TraceFriendlyHttpAsyncSupport { )(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { val state = State.capture() val span = SpanLocal.current - val tracingRequestProducer = new TracingHttpAsyncRequestProducer(requestProducer, (httpRequest: HttpRequest) => { + val tracingRequestProducer = new TracingHttpAsyncRequestProducer(requestProducer, state, (httpRequest: HttpRequest) => { TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), span) tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) httpRequest }) - val tracingResponseConsumer = new TracingHttpAsyncResponseConsumer[T](responseConsumer, (httpResponse: Try[HttpResponse]) => { + val tracingResponseConsumer = new TracingHttpAsyncResponseConsumer[T](responseConsumer, state, (httpResponse: Try[HttpResponse]) => { tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) val responseCode = getResponseCode(httpResponse) tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) @@ -197,46 +197,51 @@ class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpRespo } } -class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { - override def resetRequest(): Unit = wrappee.resetRequest() +class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, state: State, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { + override def generateRequest(): HttpRequest = state.restore { + f(wrappee.generateRequest()) + } - override def requestCompleted(context: HttpContext): Unit = wrappee.requestCompleted(context) + override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = state.restore { + wrappee.produceContent(encoder, ioctrl) + } - override def failed(ex: Exception): Unit = wrappee.failed(ex) + override def requestCompleted(context: HttpContext): Unit = state.restore { + wrappee.requestCompleted(context) + } - override def generateRequest(): HttpRequest = f(wrappee.generateRequest()) + override def failed(ex: Exception): Unit = state.restore { + wrappee.failed(ex) + } override def isRepeatable: Boolean = wrappee.isRepeatable - override def getTarget: HttpHost = wrappee.getTarget - - override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) - + override def resetRequest(): Unit = wrappee.resetRequest() override def close(): Unit = wrappee.close() } -class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { +class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], state: State, f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { + override def isDone: Boolean = wrappee.isDone + override def getResult: T = wrappee.getResult override def getException: Exception = wrappee.getException + override def cancel(): Boolean = wrappee.cancel() + override def close(): Unit = wrappee.close() - override def failed(ex: Exception): Unit = { - f(Failure(ex)) - wrappee.failed(ex) - } - - override def responseReceived(response: HttpResponse): Unit = { + override def responseReceived(response: HttpResponse): Unit = state.restore { f(Try(response)) wrappee.responseReceived(response) } - override def isDone: Boolean = wrappee.isDone - - override def responseCompleted(context: HttpContext): Unit = wrappee.responseCompleted(context) - - override def getResult: T = wrappee.getResult - - override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) + override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = state.restore { + wrappee.consumeContent(decoder, ioctrl) + } - override def cancel(): Boolean = wrappee.cancel() + override def responseCompleted(context: HttpContext): Unit = state.restore { + wrappee.responseCompleted(context) + } - override def close(): Unit = wrappee.close() + override def failed(ex: Exception): Unit = state.restore { + f(Failure(ex)) + wrappee.failed(ex) + } } \ No newline at end of file From 1cf42642a8aa6766c91097be2f243694719a8ed9 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Thu, 1 Mar 2018 10:14:05 -0500 Subject: [PATCH 7/9] Minor clean-up and add scaladocs --- .../money/core/state/RestoredState.scala | 2 + .../com/comcast/money/core/state/State.scala | 49 ++++++++++ .../client/TraceFriendlyHttpAsyncClient.scala | 91 +++++++++++-------- 3 files changed, 104 insertions(+), 38 deletions(-) diff --git a/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala index 231aa037..0b0c42f8 100644 --- a/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala +++ b/money-core/src/main/scala/com/comcast/money/core/state/RestoredState.scala @@ -16,6 +16,8 @@ package com.comcast.money.core.state +/** Represents the restored tracing state. */ trait RestoredState extends AutoCloseable { + /** Reverts the restored tracing state on the current thread */ override def close(): Unit } diff --git a/money-core/src/main/scala/com/comcast/money/core/state/State.scala b/money-core/src/main/scala/com/comcast/money/core/state/State.scala index 9821afc0..9a1b76b4 100644 --- a/money-core/src/main/scala/com/comcast/money/core/state/State.scala +++ b/money-core/src/main/scala/com/comcast/money/core/state/State.scala @@ -22,6 +22,25 @@ import org.slf4j.MDC object State { private lazy val mdcSupport = new MDCSupport() + /** + * Captures the state of the current tracing span so that it can be restored onto + * a separate worker thread. + * + * {{{ + * import com.comcast.money.core.state + * + * def doSomethingAsynchronous(executor: ExecutorService) { + * val capturedState = State.capture() + * + * executor.submit(new Runnable { + * override def run(): Unit = state.restore { + * // resumes on captured + * } + * }) + * } + * }}} + * @return the captured tracing state + */ def capture(): State = { val span = SpanLocal.current val mdc = MDC.getCopyOfContextMap @@ -42,9 +61,39 @@ object State { } } +/** Represents the tracing state that has been captured from another thread */ trait State { + /** + * Restores the tracing state on the current thread returning a [[RestoredState]] + * that can be used with Java 7+ "try-with-resources" syntax to revert the + * restored state. + * + * {{{ + * State state = State.capture(); + * // later + * try (RestoredState restored = state.restore()) { + * // do something meaningful here + * } + * }}} + * @return the restored state + */ def restore(): RestoredState + /** + * Restores the tracing state on the current thread for the duration of the + * function. + * + * {{{ + * val state = State.capture + * // later + * state.restore { + * // do something meaningful here + * } + * }}} + * @param f the function to invoke + * @tparam T the return value of the function + * @return the value returned from the function + */ def restore[T](f: => T): T = { val restoredState = restore() try { diff --git a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala index 1dd629d5..df3ce462 100644 --- a/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala +++ b/money-http-async-client/src/main/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClient.scala @@ -40,18 +40,26 @@ object TraceFriendlyHttpAsyncSupport { callback: FutureCallback[HttpResponse], tracer: Tracer )(f: FutureCallback[HttpResponse] => Future[HttpResponse]): Future[HttpResponse] = { + // capture the current tracing state... val state = State.capture() + // Put the X-MoneyTrace header in the request... TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), SpanLocal.current) - val tracingCallback = new TracingFutureHttpResponseCallback(Option(callback), state, (response: Try[HttpResponse]) => { + // Start timing the execution of the request... + tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Wrap the callback to intercept the response... + val tracingCallback = new TracingFutureCallback[HttpResponse](Option(callback), state, response => { + // Stop timing the execution of the request... tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Get the response code, will be 0 if response is null val responseCode = getResponseCode(response) tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) }) - tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) - + // Continue with the execution of the request... f(tracingCallback) } @@ -61,20 +69,35 @@ object TraceFriendlyHttpAsyncSupport { callback: FutureCallback[T], tracer: Tracer )(f: (HttpAsyncRequestProducer, HttpAsyncResponseConsumer[T], FutureCallback[T]) => Future[T]): Future[T] = { + // capture the current tracing state... val state = State.capture() val span = SpanLocal.current + + // Wrap the producer interface to intercept the request... val tracingRequestProducer = new TracingHttpAsyncRequestProducer(requestProducer, state, (httpRequest: HttpRequest) => { + // Put the X-MoneyTrace header in the request... TraceFriendlyHttpAsyncSupport.addTraceHeader(Option(httpRequest), span) + + // Start timing the execution of the request... tracer.startTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) httpRequest }) + + // Wrap the consumer interface to intercept the response... val tracingResponseConsumer = new TracingHttpAsyncResponseConsumer[T](responseConsumer, state, (httpResponse: Try[HttpResponse]) => { + // Stop timing the execution of the request... tracer.stopTimer(HttpAsyncTraceConfig.HttpResponseTimeTraceKey) + + // Get the response code, will be 0 if response is null val responseCode = getResponseCode(httpResponse) tracer.record(HttpAsyncTraceConfig.HttpResponseCodeTraceKey, responseCode) httpResponse }) - val tracingCallback = new TracingFutureCallback[T](Option(callback), state) + + // Wrap the callback interface to restore the tracing state on the callback thread... + val tracingCallback = new TracingFutureCallback[T](Option(callback), state, _ => {}) + + // Continue with the execution of the request... f(tracingRequestProducer, tracingResponseConsumer, tracingCallback) } @@ -84,11 +107,14 @@ object TraceFriendlyHttpAsyncSupport { } } - def getResponseCode(response: Try[HttpResponse]): Int = { + def getResponseCode(response: Try[HttpResponse]): Int = response map (_.getStatusLine) map (_.getStatusCode) getOrElse 0 - } } +/** + * Provides a thin wrapper around [[HttpAsyncClient]] to support automatically tracing + * requests and restoring the tracing state on the callback interfaces. + */ class TraceFriendlyHttpAsyncClient(wrappee: HttpAsyncClient) extends HttpAsyncClient with java.io.Closeable { @@ -164,29 +190,17 @@ class TraceFriendlyHttpAsyncClient(wrappee: HttpAsyncClient) extends HttpAsyncCl } } -class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State) extends FutureCallback[T] { - override def failed(ex: Exception): Unit = state.restore { - wrappee.foreach { _.failed(ex) } - } - - override def completed(result: T): Unit = state.restore { - wrappee.foreach { - _.completed(result) - } - } - - override def cancelled(): Unit = state.restore { - wrappee.foreach { _.cancelled() } - } -} - -class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpResponse]], state: State, f: Try[HttpResponse] => Unit) extends FutureCallback[HttpResponse] { +/** + * Wraps the [[FutureCallback]] callback interface and restores the captured tracing + * state before invoking any of the wrapped methods. + */ +class TracingFutureCallback[T](wrappee: Option[FutureCallback[T]], state: State, f: Try[T] => Unit) extends FutureCallback[T] { override def failed(ex: Exception): Unit = state.restore { f(Failure(ex)) wrappee.foreach(_.failed(ex)) } - override def completed(result: HttpResponse): Unit = state.restore { + override def completed(result: T): Unit = state.restore { f(Try(result)) wrappee.foreach(_.completed(result)) } @@ -197,15 +211,14 @@ class TracingFutureHttpResponseCallback(wrappee: Option[FutureCallback[HttpRespo } } +/** + * Wraps the [[HttpAsyncRequestProducer]] interface to intercept the [[HttpRequest]]. + */ class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, state: State, f: HttpRequest => HttpRequest) extends HttpAsyncRequestProducer { override def generateRequest(): HttpRequest = state.restore { f(wrappee.generateRequest()) } - override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = state.restore { - wrappee.produceContent(encoder, ioctrl) - } - override def requestCompleted(context: HttpContext): Unit = state.restore { wrappee.requestCompleted(context) } @@ -214,28 +227,23 @@ class TracingHttpAsyncRequestProducer(wrappee: HttpAsyncRequestProducer, state: wrappee.failed(ex) } + override def produceContent(encoder: ContentEncoder, ioctrl: IOControl): Unit = wrappee.produceContent(encoder, ioctrl) override def isRepeatable: Boolean = wrappee.isRepeatable override def getTarget: HttpHost = wrappee.getTarget override def resetRequest(): Unit = wrappee.resetRequest() override def close(): Unit = wrappee.close() } +/** + * Wraps the [[HttpAsyncResponseConsumer]] interface to intercept the completion of the + * HTTP request, handle the [[HttpResponse]] and to restore the captured tracing state. + */ class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], state: State, f: Try[HttpResponse] => Unit) extends HttpAsyncResponseConsumer[T] { - override def isDone: Boolean = wrappee.isDone - override def getResult: T = wrappee.getResult - override def getException: Exception = wrappee.getException - override def cancel(): Boolean = wrappee.cancel() - override def close(): Unit = wrappee.close() - override def responseReceived(response: HttpResponse): Unit = state.restore { f(Try(response)) wrappee.responseReceived(response) } - override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = state.restore { - wrappee.consumeContent(decoder, ioctrl) - } - override def responseCompleted(context: HttpContext): Unit = state.restore { wrappee.responseCompleted(context) } @@ -244,4 +252,11 @@ class TracingHttpAsyncResponseConsumer[T](wrappee: HttpAsyncResponseConsumer[T], f(Failure(ex)) wrappee.failed(ex) } + + override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl): Unit = wrappee.consumeContent(decoder, ioctrl) + override def isDone: Boolean = wrappee.isDone + override def getResult: T = wrappee.getResult + override def getException: Exception = wrappee.getException + override def cancel(): Boolean = wrappee.cancel() + override def close(): Unit = wrappee.close() } \ No newline at end of file From 00793e391cf4a71195597e38a622973be3e92f3c Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Thu, 1 Mar 2018 11:57:25 -0500 Subject: [PATCH 8/9] Add unit tests for callback interface wrappers --- .../TraceFriendlyHttpAsyncClientSpec.scala | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala index 49b3c8f7..15794a65 100644 --- a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala +++ b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala @@ -25,6 +25,7 @@ import com.comcast.money.core.internal.SpanLocal import org.apache.http.client.methods.{ CloseableHttpResponse, HttpUriRequest } import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.CloseableHttpAsyncClient +import org.apache.http.nio.{ ContentDecoder, ContentEncoder, IOControl } import org.apache.http.nio.client.HttpAsyncClient import org.apache.http.nio.protocol.{ HttpAsyncRequestProducer, HttpAsyncResponseConsumer } import org.apache.http.protocol.HttpContext @@ -179,6 +180,98 @@ class TraceFriendlyHttpAsyncClientSpec extends WordSpec with SpecHelpers verify(underTest.tracer).record("http-response-code", 0L) } + "wraps the HttpAsyncRequestProducer members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedProducer = producerCaptor.getValue + verifyZeroInteractions(requestProducer) + + val exception = mock[Exception] + capturedProducer.failed(exception) + verify(requestProducer).failed(argEq(exception)) + + val contentEncoder = mock[ContentEncoder] + val ioControl = mock[IOControl] + capturedProducer.produceContent(contentEncoder, ioControl) + verify(requestProducer).produceContent(argEq(contentEncoder), argEq(ioControl)) + + val context = mock[HttpContext] + capturedProducer.requestCompleted(context) + verify(requestProducer).requestCompleted(argEq(context)) + + capturedProducer.resetRequest() + verify(requestProducer).resetRequest() + + capturedProducer.generateRequest() + verify(requestProducer).generateRequest() + + capturedProducer.getTarget + verify(requestProducer).getTarget + + capturedProducer.isRepeatable + verify(requestProducer).isRepeatable + + capturedProducer.close() + verify(requestProducer).close() + } + "wraps the HttpAsyncResponseConsumer members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedConsumer = consumerCaptor.getValue + verifyZeroInteractions(responseConsumer) + + val contentDecoder = mock[ContentDecoder] + val ioControl = mock[IOControl] + capturedConsumer.consumeContent(contentDecoder, ioControl) + verify(responseConsumer).consumeContent(argEq(contentDecoder), argEq(ioControl)) + + val exception = mock[Exception] + capturedConsumer.failed(exception) + verify(responseConsumer).failed(argEq(exception)) + + val context = mock[HttpContext] + capturedConsumer.responseCompleted(context) + verify(responseConsumer).responseCompleted(argEq(context)) + + val response = mock[HttpResponse] + capturedConsumer.responseReceived(response) + verify(responseConsumer).responseReceived(argEq(response)) + + capturedConsumer.getException + verify(responseConsumer).getException + + capturedConsumer.getResult + verify(responseConsumer).getResult + + capturedConsumer.isDone + verify(responseConsumer).isDone + + capturedConsumer.close() + verify(responseConsumer).close() + } + "wraps the FutureCallback members" in { + underTest.execute(requestProducer, responseConsumer, httpContext, callback) + + verify(httpClient).execute(producerCaptor.capture(), consumerCaptor.capture(), argEq(httpContext), callbackCaptor.capture()) + + val capturedCallback = callbackCaptor.getValue + verifyZeroInteractions(callback) + + capturedCallback.cancelled() + verify(callback).cancelled() + + val response = mock[HttpResponse] + capturedCallback.completed(response) + verify(callback).completed(argEq(response)) + + val exception = mock[Exception] + capturedCallback.failed(exception) + verify(callback).failed(argEq(exception)) + } "calls close on closeable http client" in { underTest.close() verify(httpClient).close() From 5381eb79296edf377257549839df27ceb35282f6 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Thu, 1 Mar 2018 12:03:09 -0500 Subject: [PATCH 9/9] Kick TravisCI --- .../money/http/client/TraceFriendlyHttpAsyncClientSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala index 15794a65..6a6a4ccc 100644 --- a/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala +++ b/money-http-async-client/src/test/scala/com/comcast/money/http/client/TraceFriendlyHttpAsyncClientSpec.scala @@ -50,6 +50,7 @@ class TraceFriendlyHttpAsyncClientSpec extends WordSpec with SpecHelpers val callback = mock[FutureCallback[HttpResponse]] val requestProducer = mock[HttpAsyncRequestProducer] val responseConsumer = mock[HttpAsyncResponseConsumer[HttpResponse]] + val spanId = new SpanId() val future = new CompletableFuture[HttpResponse]()