Skip to content

Commit

Permalink
made message a type instead of just Json
Browse files Browse the repository at this point in the history
  • Loading branch information
ericmittelhammer committed Apr 9, 2014
1 parent f879fc5 commit 1534afc
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 33 deletions.
2 changes: 1 addition & 1 deletion app/actors/MessageStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ trait MessageStream

object MessageStream {

type Message = JsValue
case class Message(timestamp: java.util.Date, author: String, message: String)

/**
* Creates a MessageStream.
Expand Down
3 changes: 2 additions & 1 deletion app/actors/OfflineMessageStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class OfflineMessageStream(
// reset the iterator if we've reached the end of the list
if (!i.hasNext) i = messageList.iterator
val nextMsg = i.next
supervisor ! SocketEndpoint.NewMessage(nextMsg) //send the message to the supervisor
//send the message to the supervisor
supervisor ! SocketEndpoint.NewMessage(nextMsg.copy(timestamp = new java.util.Date()))
val nextMessageAt =
scala.util.Random.nextInt(
(maxMilliseconds - minMilliseconds) + 1) + minMilliseconds
Expand Down
49 changes: 43 additions & 6 deletions app/actors/SocketEndpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package actors
import play.api.libs.iteratee.{ Iteratee, Enumerator, Concurrent }
import play.api.libs.concurrent.Execution.Implicits._

import play.api.libs.json.{ JsValue, JsString }
import play.api.libs.json.{ JsValue, JsString, JsPath, JsResult, JsSuccess, JsError }
import play.api.libs.json.{ Reads, Writes }
import play.api.libs.json.Reads._
import play.api.libs.functional.syntax._

import akka.actor.{ Props, ActorRef, Actor, ActorRefFactory }
import akka.event.{ Logging, LoggingReceive }

import java.util.Date
import java.text.DateFormat

object SocketEndpoint {

/**
Expand All @@ -23,6 +29,23 @@ object SocketEndpoint {

def props(supervisor: ActorRef) = Props(classOf[SocketEndpoint], supervisor)

// used to convert from raw json coming from the socket into the message type
val inputReads: Reads[MessageStream.Message] = (
(JsPath \ "author").read[String] and
(JsPath \ "message").read[String]
//partially apply the case class apply function with the current timestamp
)(MessageStream.Message.apply(new java.util.Date(), _: String, _: String))

def inputToMessageResult(payload: JsValue): JsResult[MessageStream.Message] =
payload.validate[MessageStream.Message](inputReads)

// will convert a message back to json to be written to the stream
val messageWrites: Writes[MessageStream.Message] = (
// write the timestamp out in RFC2822 time
(JsPath \ "timestamp").write[Date](Writes.dateWrites("EEE, d MMM yyyy HH:mm:ss Z")) and
(JsPath \ "author").write[String] and
(JsPath \ "message").write[String]
)(unlift(MessageStream.Message.unapply))
}

/**
Expand All @@ -34,23 +57,36 @@ object SocketEndpoint {
*/
class SocketEndpoint(supervisor: ActorRef) extends Actor {

import SocketEndpoint._

val log = Logging(context.system, this)

// create the enumerator and the channel that will push to it
val (out, channel) = Concurrent.broadcast[JsValue]

var in: Iteratee[JsValue, Unit] = _

var filterString: Option[String] = None

def receive = LoggingReceive {

case Supervisor.NewSocket(name: Option[String]) => {

// create the iteratee that will handle incoming data from the websocket
in = Iteratee.foreach[JsValue] { msg =>
var in: Iteratee[JsValue, Unit] = Iteratee.foreach[JsValue] { msg =>

msg \ "messageType" match {
case JsString("newMessage") => supervisor ! SocketEndpoint.NewMessage((msg \ "payload").as[JsValue])

case JsString("newMessage") =>

//try to parse the json into a Message
inputToMessageResult((msg \ "payload").as[JsValue]) match {

// if it validates, send the message to the supervisor
case s: JsSuccess[MessageStream.Message] => supervisor ! SocketEndpoint.NewMessage(s.get)
case e: JsError => // if there was a parsing error, do nothing
}

case JsString("filter") => filterString = Some((msg \ "value").as[String])

case _ => Unit
}
}.map { _ => // this will map over the Iteratee once it has received EOF
Expand All @@ -72,7 +108,8 @@ class SocketEndpoint(supervisor: ActorRef) extends Actor {

def connected: Receive = LoggingReceive {

case SocketEndpoint.NewMessage(message: MessageStream.Message) => channel.push(message)
case SocketEndpoint.NewMessage(message: MessageStream.Message) =>
channel.push(messageWrites.writes(message))

case Supervisor.NewSocket(name: Option[String]) => log.warning("already connected")
}
Expand Down
2 changes: 1 addition & 1 deletion app/actors/Supervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Supervisor {

/**
* Supervisor that will route messages between the MessageStream and SocketEndpoints
* @param messageStream a factory funtction to create MessageStream actors.
* @param messageStream a factory function to create MessageStream actors.
* The funciton accepts a reference to it's supervisor
*/
class Supervisor(messageStreamFactory: MessageStreamFactory,
Expand Down
32 changes: 17 additions & 15 deletions app/actors/TwitterMessageStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,53 @@ import akka.event.{ EventStream, Logging, LoggingReceive }

import scala.concurrent.duration._

object TwitterMessageStream{
/*object TwitterMessageStream {
def tweetToMessage(tweet: JsObject): JsObject = {
}
}
class TwitterMessageStream(
supervisor: ActorRef,
url: String,
consumerKey: String,
supervisor: ActorRef,
url: String,
consumerKey: String,
accessToken: String) extends Actor with MessageStream {
val req = WS.url(url).withRequestTimeout(-1).sign(OAuthCalculator(consumerKey, accessToken))
// iteratee that is used to turn the streaming API
// into individual messages
val iteratee = Iteratee.foreach[Array[Byte]] { chunk => {

val chunkedString = new String(chunk, "UTF-8")
val iteratee = Iteratee.foreach[Array[Byte]] { chunk =>
val chunkedString = new String(chunk, "UTF-8")
val json = Json.parse(chunkedString)
val json = Json.parse(chunkedString)
(json \ "id_str").asOpt[String].map { id => WS.url(elasticTweetURL + id).put(json) }
matchAndPush(json)
json.asOpt[String].map { tweet =>
supervisor ! SocketEndpoint.NewMessage(tweetToMessage(tweet))
}
}
val log = Logging(context.system, this)
def stopped: Receive = {
case StartStream => {
log.info("Stream Started")
log.info("TwitterStream Started")
context.become(started)
}
case StopStream => log.warning("Stream already stopped")
}
def started: Receive = LoggingReceive {
def started: Receive = {
case StartStream => log.warning("Stream already started")
case StopStream => context.become(stopped)
}
override def receive = stopped
}
}*/
8 changes: 7 additions & 1 deletion app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ object Application extends Controller {

val actorSystem = ActorSystem("reactive")

val testMessages = List(
MessageStream.Message(timestamp = new java.util.Date(), author = "Author1", message = "Message1"),
MessageStream.Message(timestamp = new java.util.Date(), author = "Author2", message = "Message2"),
MessageStream.Message(timestamp = new java.util.Date(), author = "Author3", message = "Message3")
)

val messageStreamFactory = (s: ActorRef, context: ActorRefFactory) => {
val props = OfflineMessageStream.props(s, List(JsString("one"), JsString("two"), JsString("three")), 500, 1000)
val props = OfflineMessageStream.props(s, testMessages, 500, 1000)
context.actorOf(props, "OfflineMessageStream")
}

Expand Down
8 changes: 7 additions & 1 deletion test/actors/MessageStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ class OfflineMessageStreamSpec extends TestKit(ActorSystem("OMSSystem",
shutdown(system)
}

val oms = system.actorOf(OfflineMessageStream.props(self, List(JsString("one"), JsString("two"), JsString("three")), 50, 3000), "oms")
val testMessages = List(
MessageStream.Message(timestamp = new java.util.Date(), author = "Author1", message = "Message1"),
MessageStream.Message(timestamp = new java.util.Date(), author = "Author2", message = "Message2"),
MessageStream.Message(timestamp = new java.util.Date(), author = "Author3", message = "Message3")
)

val oms = system.actorOf(OfflineMessageStream.props(self, testMessages, 50, 3000), "oms")

"an unstarted OfflineMessageStream Actor" should {
"not publish any messages" in {
Expand Down
14 changes: 10 additions & 4 deletions test/actors/SocketEndpontSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ class SocketEndpointSpec extends TestKit(ActorSystem("SocketEndpointSystem",

val f = socketActor.out.run(i)

val msg = MessageStream.Message(timestamp = new java.util.Date(), author = "Author1", message = "Hello")

// send a message to the socket
socket ! SocketEndpoint.NewMessage(JsString("hello"))
socket ! SocketEndpoint.NewMessage(msg)

socketActor.channel.end

// verify the message was processed by the iteratee
whenReady(f) { result =>
result should equal(List(JsString("hello")))
result should equal(List(SocketEndpoint.messageWrites.writes(msg)))
}

}
Expand Down Expand Up @@ -96,15 +98,19 @@ class SocketEndpointSpec extends TestKit(ActorSystem("SocketEndpointSystem",

val msg = Json.obj(
"messageType" -> "newMessage",
"payload" -> "here's the payload"
"payload" -> Json.obj("author" -> "testAuthor", "message" -> "hello")
)

whenReady(f) { result =>

// send a Json message to the iterator
Enumerator[JsValue](msg).run(result._1)

expectMsg(SocketEndpoint.NewMessage(JsString("here's the payload")))
// validate that it was parsed and then sent as a message
// we have to use a partial function here because the timestamp will be set by the SocketEndpoint
expectMsgPF() {
case m: SocketEndpoint.NewMessage if (m.message.author == "testAuthor" && m.message.message == "hello") => true
}
}

}
Expand Down
8 changes: 5 additions & 3 deletions test/actors/SupervisorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ class SupervisorSpec extends TestKit(ActorSystem("SupervisorSystem",

supervisor.sockets = ParSet(socket1.ref, socket2.ref)

supervisorRef ! SocketEndpoint.NewMessage(JsString("hello"))
val msg = MessageStream.Message(timestamp = new java.util.Date(), author = "Author1", message = "Hello")

socket1.expectMsg(SocketEndpoint.NewMessage(JsString("hello")))
supervisorRef ! SocketEndpoint.NewMessage(msg)

socket2.expectMsg(SocketEndpoint.NewMessage(JsString("hello")))
socket1.expectMsg(SocketEndpoint.NewMessage(msg))

socket2.expectMsg(SocketEndpoint.NewMessage(msg))
}

"send a StopStream message when its final socket has been closed" in {
Expand Down

0 comments on commit 1534afc

Please sign in to comment.