Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directives body opt #189

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions core/src/main/scala/com/spingo/op_rabbit/ConnectionParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.rabbitmq.client.SaslConfig
import com.rabbitmq.client.impl.DefaultExceptionHandler
import com.typesafe.config.Config
import javax.net.SocketFactory
import javax.net.ssl.SSLContext

import scala.collection.JavaConverters._
import scala.util.Try

Expand Down Expand Up @@ -39,7 +41,8 @@ case class ConnectionParams(
saslConfig: SaslConfig = DefaultSaslConfig.PLAIN,
sharedExecutor: Option[java.util.concurrent.ExecutorService] = None,
shutdownTimeout: Int = ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT,
socketFactory: SocketFactory = SocketFactory.getDefault
socketFactory: SocketFactory = SocketFactory.getDefault,
sslContextOpt: Option[SSLContext] = None
) {
// TODO - eliminate ClusterConnectionFactory after switching to use RabbitMQ's topology recovery features.
protected [op_rabbit] def applyTo(factory: ClusterConnectionFactory): Unit = {
Expand All @@ -58,7 +61,16 @@ case class ConnectionParams(
sharedExecutor.foreach(factory.setSharedExecutor)
factory.setShutdownTimeout(shutdownTimeout)
factory.setSocketFactory(socketFactory)
if (ssl) factory.useSslProtocol()
if (ssl) {
sslContextOpt match {
case Some(sslContext) =>
// suitable for production
factory.useSslProtocol(sslContext)
case None =>
// suitable for development
factory.useSslProtocol()
}
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/com/spingo/op_rabbit/Directives.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import scala.language.implicitConversions
import shapeless._
import com.spingo.op_rabbit.Binding._
import com.spingo.op_rabbit.Exchange.ExchangeType
import EnhancedTry._

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -179,6 +180,27 @@ trait Directives {
}
}

/**
Extract the message body as a Either. Uses a [[com.spingo.op_rabbit.RabbitUnmarshaller RabbitUnmarshaller]] to deserialize.
In case the body cannot be unmarshalled, the exception is present in the Left of the Either.
In this way the client code can have the unmarshalling error reason.
Example:

{{{
bodyOpt(as[JobDescription]) { jobDescriptionEither => ...
}
}}}
*/
def bodyEither[T](um: RabbitUnmarshaller[T]): Directive1[Either[Throwable, T]] = new Directive1[Either[Throwable, T]] {
def happly(fn: ::[Either[Throwable, T], HNil] => Handler): Handler = { (promise, delivery) =>
val dataTry = Try {
um.unmarshall(delivery.body, Option(delivery.properties.getContentType), Option(delivery.properties.getContentEncoding))
}

fn(dataTry.toEither :: HNil)(promise, delivery)
}
}

/**
Extract any arbitrary value from the delivery / Java RabbitMQ objects. Accepts a function which receives a Delivery and returns some value.
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/com/spingo/op_rabbit/EnhancedTry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.spingo.op_rabbit

import scala.util.{Success, Try}

object EnhancedTry {
implicit class EnhancedTryImpl[T](t: Try[T]) {
def toEither: Either[Throwable, T] =
t.transform(success => Success(Right(success)), exception => Success(Left(exception))).get
}
}
23 changes: 23 additions & 0 deletions core/src/test/scala/com/spingo/op_rabbit/DirectivesSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ class DirectivesSpec extends FunSpec with Matchers with Inside {
}
} should be (acked)
}

it("yields the value for both directives when one is bodyEither") {
val delivery = testDelivery(body = "hi".getBytes, properties = Seq(ReplyTo("place")))
resultFor(delivery) {
(bodyEither(as[String]) & property(ReplyTo)) { (bodyEither, replyTo) =>
replyTo should be ("place")
bodyEither.right.get should be ("hi")
ack
}
} should be (acked)
}
}

describe("bodyEither") {
it("yields the value as an option") {
val delivery = testDelivery(body = "hi".getBytes)
resultFor(delivery) {
(bodyEither(as[String])) { (bodyEither) =>
bodyEither.right.get should be ("hi")
ack
}
} should be (acked)
}
}

describe("|") {
Expand Down
2 changes: 1 addition & 1 deletion project/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.0
version=2.1.0