Skip to content

Commit

Permalink
#451: Add java friendly api for PipelineFlowFactory (#608)
Browse files Browse the repository at this point in the history
#451: Add java friendly api for PipelineFlowFactory & RequestContext
  • Loading branch information
sebady authored and anilgursel committed Mar 12, 2018
1 parent 0a33510 commit 82ef1b1
Show file tree
Hide file tree
Showing 20 changed files with 826 additions and 128 deletions.
91 changes: 85 additions & 6 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,42 @@ dummyflow {

A sample `DummyBidiFlow` looks like below:

##### Scala

```scala
class DummyBidiFlow extends PipelineFlowFactory {

override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
val inbound = b.add(Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.withResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
```

##### Java

```java
public class DummyBidiFlow extends AbstractPipelineFlowFactory {

@Override
public BidiFlow<RequestContext, RequestContext, RequestContext, RequestContext, NotUsed> create(Context context, ActorSystem system) {
return BidiFlow.fromGraph(GraphDSL.create(b -> {
final FlowShape<RequestContext, RequestContext> inbound = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withRequestHeader(RawHeader.create("DummyRequest", "ReqValue"))));
final FlowShape<RequestContext, RequestContext> outbound = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withResponseHeader(RawHeader.create("DummyResponse", "ResValue"))));

return BidiShape.fromFlows(inbound, outbound);
}));
}
}
```

#### Aborting the flow
In certain scenarios, a stage in pipeline may have a need to abort the flow and return an `HttpResponse`, e.g., in case of authentication/authorization. In such scenarios, the rest of the pipeline should be skipped and the request should not reach to the squbs service. To skip the rest of the flow:

Expand All @@ -124,17 +147,19 @@ In certain scenarios, a stage in pipeline may have a need to abort the flow and

In the below `DummyAbortableBidiFlow ` example, `authorization ` is a bidi flow with `abortable` and it aborts the flow is user is not authorized:

##### Scala

```scala
class DummyAbortableBidiFlow extends PipelineFlowFactory {

override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {

BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundA = b.add(Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.withResponseHeader(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.withResponseHeader(RawHeader("keyOutC", "valOutC"))})

val inboundOutboundB = b.add(authorization abortable)

Expand All @@ -161,6 +186,60 @@ class DummyAbortableBidiFlow extends PipelineFlowFactory {
}
```

##### Java

```java
public class DummyAbortableBidiFlow extends japi.PipelineFlowFactory {

@Override
public BidiFlow<RequestContext, RequestContext, RequestContext, RequestContext, NotUsed>
create(Context context, ActorSystem system) {

return BidiFlow.fromGraph(GraphDSL.create(b -> {
final FlowShape<RequestContext, RequestContext> inboundA = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withRequestHeader(RawHeader.create("keyInA", "valInA"))));
final FlowShape<RequestContext, RequestContext> inboundC = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withRequestHeader(RawHeader.create("keyInC", "valInC"))));
final FlowShape<RequestContext, RequestContext> outboundA = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withRequestHeader(RawHeader.create("keyOutA", "valOutA"))));
final FlowShape<RequestContext, RequestContext> outboundC = b.add(
Flow.of(RequestContext.class)
.map(rc -> rc.withResponseHeader(RawHeader.create("keyOutC", "valOutC"))));

final BidiShape<RequestContext, RequestContext> inboundOutboundB =
b.add(abortable(authorization));

b.from(inboundA).toInlet(inboundOutboundB.in1());
b.to(inboundC).fromOutlet(inboundOutboundB.out1());
b.from(outboundC).toInlet(inboundOutboundB.in2());
b.to(outboundA).fromOutlet(inboundOutboundB.out2());

return new BidiShape<>(inboundA.in(), inboundC.out(), outboundC.in(), outboundA.out());
}));
}

final BidiFlow<RequestContext, RequestContext, RequestContext, RequestContext, NotUsed> authorization =
BidiFlow.fromGraph(GraphDSL.create(b -> {
final FlowShape<RequestContext, RequestContext> authorization = b.add(
Flow.of(RequestContext.class)
.map(rc -> {
if (!isAuthorized()) {
rc.abortWith(HttpResponse.create()
.withStatus(StatusCodes.Unauthorized()).withEntity("Not Authorized!"));
} else return rc;
}));

FlowShape<RequestContext, RequestContext, NotUsed> noneFlow = b.add(
Flow.of(RequestContext.class));

return BidiShape.fromFlows(authorization, noneFlow);
}));
}
```

Once a flow is added with `abortable`, a bidi flow gets connected. This bidi flow checks the existence of `HttpResponse` and bypasses or sends the request downstream. Here is how the above `DummyAbortableBidiFlow` looks:


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object MetricsFlow {

val inbound = Flow[RequestContext].map { rc =>
metrics.meter(requestCount).mark()
rc ++ (requestTime -> metrics.timer(requestTime).time())
rc.withAttribute(requestTime, metrics.timer(requestTime).time())
}

val outbound = Flow[RequestContext].map { rc =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ object ClientFlow {
PipelineExtension(system).getFlow(pipelineSetting, Context(name, ClientPipeline)) match {
case Some(pipeline) =>
val tupleToRequestContext = Flow[(HttpRequest, T)].map { case (request, t) =>
RequestContext(request, 0) ++ (AkkaHttpClientCustomContext -> t)
RequestContext(request, 0).withAttribute(AkkaHttpClientCustomContext, t)
}

val fromRequestContextToTuple = Flow[RequestContext].map { rc =>
Expand All @@ -274,7 +274,7 @@ object ClientFlow {
})
case None =>
val customContextToRequestContext = Flow[(HttpRequest, T)].map { case (request, t) =>
(request, RequestContext(request, 0) ++ (AkkaHttpClientCustomContext -> t))
(request, RequestContext(request, 0).withAttribute(AkkaHttpClientCustomContext, t))
}

val requestContextToCustomContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ class DummyFlow extends PipelineFlowFactory {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val stageA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeaders(RawHeader("keyA", "valA")) })
val stageB = b.add(Flow[RequestContext].map { rc => rc.addRequestHeaders(RawHeader("keyB", "valB")) })
val stageA = b.add(Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyA", "valA")) })
val stageB = b.add(Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyB", "valB")) })
val stageC = b.add(dummyBidi)
val stageD = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyD", "valD")) })
val stageD = b.add(Flow[RequestContext].map { rc => rc.withResponseHeader(RawHeader("keyD", "valD")) })

stageA ~> stageB ~> stageC.in1
stageD <~ stageC.out2
Expand All @@ -188,24 +188,24 @@ class DummyFlow extends PipelineFlowFactory {
})
}

val requestFlow = Flow[RequestContext].map { rc => rc.addRequestHeaders(RawHeader("keyC", "valC")) }
val requestFlow = Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyC", "valC")) }
val dummyBidi = BidiFlow.fromFlows(requestFlow, Flow[RequestContext])
}

class PreFlow extends PipelineFlowFactory {

override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
val inbound = Flow[RequestContext].map { rc => rc.addRequestHeaders(RawHeader("keyPreInbound", "valPreInbound")) }
val outbound = Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyPreOutbound", "valPreOutbound")) }
val inbound = Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyPreInbound", "valPreInbound")) }
val outbound = Flow[RequestContext].map { rc => rc.withResponseHeader(RawHeader("keyPreOutbound", "valPreOutbound")) }
BidiFlow.fromFlows(inbound, outbound)
}
}

class PostFlow extends PipelineFlowFactory {

override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
val inbound = Flow[RequestContext].map { rc => rc.addRequestHeaders(RawHeader("keyPostInbound", "valPostInbound")) }
val outbound = Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyPostOutbound", "valPostOutbound")) }
val inbound = Flow[RequestContext].map { rc => rc.withRequestHeader(RawHeader("keyPostInbound", "valPostInbound")) }
val outbound = Flow[RequestContext].map { rc => rc.withResponseHeader(RawHeader("keyPostOutbound", "valPostOutbound")) }
BidiFlow.fromFlows(inbound, outbound)
}
}
6 changes: 5 additions & 1 deletion squbs-pipeline/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-testkit" % akkaV % "test",
"ch.qos.logback" % "logback-classic" % logbackInTestV % "test"
)
enablePlugins(de.johoop.testngplugin.TestNGPlugin)

(testOptions in Test) += Tests.Argument(TestFrameworks.ScalaTest, "-h", "report/squbs-pipeline")
testOptions in Test ++= Seq(
Tests.Argument(TestFrameworks.ScalaTest, "-h", "report/squbs-pipeline"),
Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
)

updateOptions := updateOptions.value.withCachedResolution(true)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 PayPal
* Copyright 2018 PayPal
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,10 +18,9 @@ package org.squbs.pipeline

import akka.NotUsed
import akka.actor._
import akka.stream.scaladsl._
import akka.stream.{javadsl, scaladsl}
import com.typesafe.config.ConfigObject

import scala.annotation.tailrec
import scala.util.Try

sealed trait PipelineType
Expand All @@ -30,13 +29,35 @@ case object ClientPipeline extends PipelineType

case class Context(name: String, pipelineType: PipelineType)

trait PipelineFlowFactory {
package japi {

import akka.stream.javadsl.BidiFlow

/**
* Java API
*/
trait PipelineFlowFactory {

def create(context: Context, system: ActorSystem):
javadsl.BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed]

def abortable(flow: BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed]):
javadsl.BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed] =
AbortableBidiFlow(flow.asScala).abortable.asJava
}

}

trait PipelineFlowFactory extends japi.PipelineFlowFactory {

def create(context: Context)(implicit system: ActorSystem):
BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed]
scaladsl.BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed]

override def create(context: Context, system: ActorSystem):
javadsl.BidiFlow[RequestContext, RequestContext, RequestContext, RequestContext, NotUsed] = create(context)(system).asJava
}

class PipelineExtensionImpl(flowFactoryMap: Map[String, PipelineFlowFactory],
class PipelineExtensionImpl(flowFactoryMap: Map[String, japi.PipelineFlowFactory],
serverDefaultFlows: (Option[String], Option[String]),
clientDefaultFlows: (Option[String], Option[String]))
(implicit system: ActorSystem) extends Extension {
Expand All @@ -46,36 +67,30 @@ class PipelineExtensionImpl(flowFactoryMap: Map[String, PipelineFlowFactory],
val (appFlow, defaultsOn) = pipelineSetting

val (defaultPreFlow, defaultPostFlow) =
if(defaultsOn getOrElse true) {
if (defaultsOn getOrElse true) {
context.pipelineType match {
case ServerPipeline => serverDefaultFlows
case ClientPipeline => clientDefaultFlows
}
} else (None, None)

val pipelineFlowNames = (defaultPreFlow :: appFlow :: defaultPostFlow :: Nil).flatten

if(pipelineFlowNames.isEmpty) None
else buildPipeline(pipelineFlowNames, context)
buildPipeline(pipelineFlowNames, context)
}

private def buildPipeline(flowNames: Seq[String], context: Context) = {

val flows = flowNames.toList collect { case name =>
val flowFactory = flowFactoryMap.getOrElse(name, throw new IllegalArgumentException(s"Invalid pipeline name $name"))
flowFactory.create(context)
}

@tailrec
def connectFlows(currentFlow: PipelineFlow, flowList: List[PipelineFlow]): PipelineFlow = {
val flows = flowNames map { case name =>
val flowFactory = flowFactoryMap
.getOrElse(name, throw new IllegalArgumentException(s"Invalid pipeline name $name"))

flowList match {
case Nil => currentFlow
case head :: tail => connectFlows(currentFlow atop head, tail)
flowFactory match {
case factory: PipelineFlowFactory => factory.create(context)
case factory: japi.PipelineFlowFactory => factory.create(context, system).asScala
case factory => throw new IllegalArgumentException(s"Unsupported flow factory type ${factory.getClass.getName}")
}
}

Some(connectFlows(flows.head, flows.tail))
flows.reduceLeftOption(_ atop _)
}
}

Expand All @@ -91,11 +106,11 @@ object PipelineExtension extends ExtensionId[PipelineExtensionImpl] with Extensi
(n, v.toConfig)
}

var flowMap = Map.empty[String, PipelineFlowFactory]
var flowMap = Map.empty[String, japi.PipelineFlowFactory]
flows foreach { case (name, config) =>
val factoryClassName = config.getString("factory")

val flowFactory = Class.forName(factoryClassName).newInstance().asInstanceOf[PipelineFlowFactory]
val flowFactory = Class.forName(factoryClassName).newInstance().asInstanceOf[japi.PipelineFlowFactory]

flowMap = flowMap + (name -> flowFactory)
}
Expand Down
Loading

0 comments on commit 82ef1b1

Please sign in to comment.