Skip to content

Commit

Permalink
Minor changes and demo video added (RxTwitterStats)
Browse files Browse the repository at this point in the history
  • Loading branch information
farquet committed Mar 13, 2015
1 parent bc2fd1d commit 851bf14
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 40 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Then, we see that Iteratees and Enumerators are deeply integrated with the Play

* __RxTime__ : the simplest possible app that could serve as a template project. This is a simple web page with a single button. When you click on it, this creates an Observable on the server that will produce the time every second. You can stop the clock (i. e. unsubscribe from the Observable) and start it again (new subscription).

* __RxTwitterStats__ : this is a complex app illustrating the power of RxPlay combined with WidgetManager (several Observables, chaining them, buffering, etc). This will show real-time tweets for a specific keyword. Top 3 mentionned users and number of tweets per second are displayed on screen. See Readme from the project directory for details and configuration (to add your Twitter tokens).
* __RxTwitterStats__ : this is a complex app illustrating the power of RxPlay combined with WidgetManager (several Observables, chaining them, buffering, etc). This will show real-time tweets for a specific keyword. Top 3 mentionned users and number of tweets per second are displayed on screen. See Readme from the project directory for details and configuration (to add your Twitter tokens) or check out this live demo on Youtube :
[![RxTwitterStats demo](http://img.youtube.com/vi/G70KP-A3AT8/0.jpg)](http://www.youtube.com/watch?v=G70KP-A3AT8)

### the necessary files

Expand Down
40 changes: 14 additions & 26 deletions RxTime/app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,32 @@ object Application extends Controller {
* Creates a WebSocket using WidgetManager
*/
def socket = WebSocket.using[String] { request =>

val timeObs = Observable.interval(1 second).map(_ =>
new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance.getTime)
)


val timeObs = Observable.interval(1 second).map(_ => new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance.getTime))

def processClientData(func: String, arg:String, m:WidgetManager) = {
func match { // matching command received from client

case "start" =>
m.getObservable("time") match {
case Some(_) =>
println("starting clock")
// subscribing to the observable by just pushing data in the webSocket
// (this will automatically call timeUpdate JS callback on the client side)
m.subscribePush("time")
case None =>
println("Warning : \"time\" Observable doesn't exist.")
}

func match {
case "start" =>
m.subscribePush("time")
case "stop" =>
println("stopping clock")
m.unsubscribe("time")

case _ =>
println("Unrecognized input <"+func+":"+arg+">")
case _ =>
println("unrecognized token")
}
}

// function that will clean the app when client disconnect
def onClientClose(m: WidgetManager) = {

def onClientClose(m:WidgetManager) = {
m.unsubscribe("time")
m.close // closing manager
}

val manager = new WidgetManager(processClientData, onClientClose)

// adding the time observable to the widgetManager

manager.addObservable("time", timeObs)

// return the websocket
manager.webSocket
}

Expand Down
2 changes: 1 addition & 1 deletion RxTime/app/views/index.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
var pathArray = window.location.pathname.split( '/' );
pathArray[pathArray.length-1] = "socket";
var path = pathArray.join("/");
url= "ws://"+window.location.host+path;
url="ws://"+window.location.host+path;

// creating WidgetManager
var widgetManager = new WidgetManager(url);
Expand Down
32 changes: 21 additions & 11 deletions RxTwitterStats/app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,8 @@ object Application extends Controller {

val bufferedReader = new BufferedReader(new InputStreamReader(is, "UTF-8"))

def closeReader: Unit = {
println("App is closing Twitter stream.")

if (bufferedReader.ready)
bufferedReader.close
}

Observable { sub: Subscriber[String] =>

try {
def getTweet:Unit = {
val f = Future[String] {
Expand All @@ -91,13 +84,13 @@ object Application extends Controller {
getTweet // fetching the next value
} else {
println("Unsubscribed from Twitter stream.")
closeReader
bufferedReader.close
}
}
case Failure(e) => {
println("Error with Twitter stream : "+e.getMessage())
sub.onError(e) // propagating the error
closeReader
bufferedReader.close
}
}
}
Expand All @@ -106,10 +99,11 @@ object Application extends Controller {
} catch {
case e : Throwable => {
sub.onError(e) // passing the error to the Observable
bufferedReader.close
System.err.println("Error : " + e.getMessage())
}
}
}.filter(_.length > 0) // deleting keep-alive data from Twitter
}.filter(_.length > 0) // deleting keep-alive messages from Twitter
}

/**
Expand Down Expand Up @@ -161,13 +155,18 @@ object Application extends Controller {

case "keywordChanged" if (arg.length > 0) =>
submit.onNext(resetObs) // to notify subscribers that we change twitter stream

// waiting to avoid starting a new feed too early and get banned by Twitter (status 420)
Thread.sleep(1000)

twitterFeedKeyword(arg) match {
case Some(obs) => {
m.send("twitter", "Awaiting tweets for keyword : "+xml.Utility.escape(arg))
submit.onNext(obs) // adding the feed to the Subject of Observable
}
case None => m.send("twitter", "Error setting the Twitter feed...")
}

case _ =>
println("Unrecognized input <"+func+":"+arg+">")
}
Expand All @@ -188,10 +187,17 @@ object Application extends Controller {
m.close
}

// creating WidgetManager
val manager = new WidgetManager(processClientData, onClientClose)

/*
* Creating all Observables
*/

val ctrObs = tweets.scan(0)((ctr, tweet) => if (tweet == "RESET") 0 else ctr + 1).map(_.toString)

val speedObs = tweets.buffer(1 second).filter(_.length > 0).filter(_ != "RESET").map(_.length.toString)

val top3Obs = tweets.scan(Map.empty[String,Int])((m, t) => {
if (t == "RESET") {
Map.empty // cleaning the mentions ranking if we change feed
Expand All @@ -215,6 +221,10 @@ object Application extends Controller {
top3.map(el => el._1+","+el._2).mkString(";")
}).buffer(100 milliseconds).filter(_.length > 0).map(_.head)

/*
* Adding Observables to WidgetManager and subscribing to them
*/

// sends tweet text to client extracted from json
manager.addObservable("tweets", tweets.buffer(100 milliseconds).filter(_.length > 0).map(_.head))
manager.subscribe("tweets", { jsonTweet:String =>
Expand Down
3 changes: 2 additions & 1 deletion RxTwitterStats/app/views/index.scala.html
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

@main {
<div id="content">

Expand All @@ -18,7 +19,7 @@
<div id="counterBox">
<br />
<p>Number of live tweets : <span id="counter">0</span></p>
<br />
<br />
</div>

<div id="mentionsRank">
Expand Down
1 change: 1 addition & 0 deletions RxTwitterStats/public/stylesheets/main.css
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ a:hover {
border: 0px;
background: #ffc;
pading: 3px;
margin-left:12px;
height: 28px;
color: #555555;
font-size:1.2em;
Expand Down

0 comments on commit 851bf14

Please sign in to comment.