Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Twitter Sentiment Project #2

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions twittter-sentiment/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#Twitter Sentiment Project

#Prerequisite
1. Download and Install Node.js.
2. Download and Run Apache Spark.
3. Download and Run Apache Kafka along with Apache Zookeeper.
4. Download and Install SBT.

#Clone the Project into the local machine
git clone https://github.com/ManjunathKmph/twitter-sentiment.git

#Project Design
![Alt Project Design](/images/ProjectDesign.png "Project Design")

#Steps to prepare the project
1. Download the Stanford NLP models jar from the below link and copy it into twitter-sentiment/jars folder.
* https://drive.google.com/file/d/0B89AibnP3IZzcUhmZ2o3Q2lmM2M/view?usp=sharing
2. Open the file from the path src/main/resources/application.properties and change the values for the following key
* twitter4j.oauth.consumerKey
* twitter4j.oauth.consumerSecret
* twitter4j.oauth.accessToken
* twitter4j.oauth.accessTokenSecret
* bootstrap.servers.config
3. Create the following kafka topics
* kafka-topics.sh --create --topic twitterTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
* kafka-topics.sh --create --topic sentimentTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
4. Change the spark home path in the following file
* start-streaming.sh
5. Command to execute the files in the machine
* chmod 777 start-server.sh
* chmod 777 start-streaming.sh

#Steps to run the project
1. Open the terminal and type the following line
* ./start-server.sh -- To check the server is up and running successfully, in the console it will print use the url localhost:8888/web
2. Hit the http://localhost:8888/web in the browser and should show the following image.
![Alt Initial Screen](/images/initialscreen.png "Initial Screen")
3. Open another terminal and type the follwing line
* ./start-streaming.sh
4. Refresh the browser url and should show the following image.
![Alt Inital Streaming Screen](/images/second.png "Inital Streaming Screen")

#How to start sentiment analysis for the hash tag
1. In the browser url, type the hash tag name(ex:- food) in the textbox and click on show tweets. It should show the following image.
![Alt Sentiment Analysis Screen](/images/third.png "Sentiment Analysis Screen")

Note:- In the above, code will search the mentioned hash tags in the tweets for the last 60 seconds, if it appear in the tweet then it will do the sentiment analysis using Stanford NLP library and marks the tweet as positive/negaitve/neutral/notunderstood. For the mentioned hash tag if the sentiment graph doesn't show then please wait till someone uses the hashtag in the last 60 seconds.

21 changes: 21 additions & 0 deletions twittter-sentiment/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name := "twitter-sentiment"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.9.0.0"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "provided"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "provided" classifier "models"
libraryDependencies += "com.typesafe" % "config" % "1.3.1"
libraryDependencies += "org.scalatest" % "scalatest_2.11" % "3.0.1" % "test"

crossPaths := false


assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
Binary file added twittter-sentiment/images/ProjectDesign.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added twittter-sentiment/images/initialscreen.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added twittter-sentiment/images/second.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added twittter-sentiment/images/third.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added twittter-sentiment/jars/ejml-0.23.jar
Binary file not shown.
Binary file not shown.
18 changes: 18 additions & 0 deletions twittter-sentiment/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "twitter-sentiment",
"version": "1.0.0",
"description": "Twitter Sentiment Project",
"dependencies": {
"express": "4.10.2",
"socket.io": "1.7.2",
"kafka-node": "0.4.0"
},
"main": "web/server.js",
"private": true,
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node web/server.js"
},
"author": "Manjunath Kempaiah",
"license": "BSD"
}
1 change: 1 addition & 0 deletions twittter-sentiment/project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
22 changes: 22 additions & 0 deletions twittter-sentiment/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#########################################################################################################
############## Twitter Streaming Project ##############
############## Configuration Parameters ##############
#########################################################################################################
##Spark related configuration parameters
master.mode=local[*]
application.name=Twitter Streaming Application
log.level=WARN

##Twitter related configuration parameters
twitter4j.oauth.consumerKey=
twitter4j.oauth.consumerSecret=
twitter4j.oauth.accessToken=
twitter4j.oauth.accessTokenSecret=
twitter.streaming.topic=twitterTopic
twitter.sentiment.topic=sentimentTopic

##Apache Kakfa related configuration parameters
bootstrap.servers.config=ip-172-31-13-154.ec2.internal:6667
client.id.config=TweetsProducer
key.serializer.class.config=org.apache.kafka.common.serialization.StringSerializer
value.serializer.class.config=org.apache.kafka.common.serialization.StringSerializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.manju.twitter

import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

/**
* @author Manjunath Kempaiah
* @version 1.0
*
* Class uses the kafka api to send the message
* to the specific topics.
*
*/
class KafkaTweetProducer(props:Properties) {

val producer = new KafkaProducer[String, String](props)

/*
* Method will send the message to the specific kafka topic.
*
* topic -- Topic name.
* message -- Message to publish.
*/
def sendMessage(topic:String, message:String) {
val data = new ProducerRecord[String, String](topic, null, message)
producer.send(data)
}

/*
* Method will close the producer connection to the Kafka cluster.
*/
def closeProducer() {
producer.close()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.manju.twitter

/**
* @author Manjunath Kempaiah
* @version 1.0
*
* Defined couple of enums which will be used in Sentiment Analysis.
*
*/
sealed trait SENTIMENT_TYPE
case object NEGATIVE extends SENTIMENT_TYPE
case object NEUTRAL extends SENTIMENT_TYPE
case object POSITIVE extends SENTIMENT_TYPE
case object VERY_POSITIVE extends SENTIMENT_TYPE
case object NOT_UNDERSTOOD extends SENTIMENT_TYPE
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.manju.twitter

import java.util.Properties
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations

import scala.collection.JavaConversions._
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations.SentimentAnnotatedTree

/**
* @author Manjunath Kempaiah
* @version 1.0
*
* Class does the sentiment analysis on the tweets using stanford nlp library.
*
*/
object SentimentAnalyser {

/*
* Fetching custom Sentiment value based on conditions from the stanford sentiment value.
*/
def fetchSentimentType(sentiment:Int): String = {
sentiment match {
case sentiment if sentiment == 0 => NOT_UNDERSTOOD.toString()
case sentiment if sentiment == 1 => NEGATIVE.toString()
case sentiment if sentiment == 2 => NEUTRAL.toString()
case sentiment if sentiment == 3 => POSITIVE.toString()
case sentiment if sentiment == 4 => VERY_POSITIVE.toString()
case sentiment if sentiment > 4 => NOT_UNDERSTOOD.toString()
}
}

/*
* Real fun begins from this method where it does the sentiment analysis on the tweet
* using Stanford NLP library.
*/
def detectSentiment(message:String): String = {
//if the message is blank then return the sentiment value as not understood.
if(message == null || message == "" || message.length() == 0) {
NOT_UNDERSTOOD
}

val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, pos, lemma, parse, sentiment")
val pipeline = new StanfordCoreNLP(props)
//Calling stanford NLP api to process the message.
val annotation = pipeline.process(message)

var sentiment = 0
var longest = 0

for (sentence <- annotation.get(classOf[CoreAnnotations.SentencesAnnotation])) {
val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
val tempSentiment = RNNCoreAnnotations.getPredictedClass(tree)
val partText = tempSentiment.toString
if (partText.length() > longest) {
sentiment = tempSentiment
longest = partText.length()
}
}
fetchSentimentType(sentiment)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.manju.twitter


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.sys.SystemProperties
import org.apache.spark.streaming.twitter.TwitterUtils
import java.util.Properties
import org.apache.kafka.clients.producer.ProducerConfig

import com.manju.twitter.SentimentAnalyser._
import com.typesafe.config.ConfigFactory

/**
* @author Manjunath Kempaiah
* @version 1.0
*
* Class connects to twitter using apache spark streaming api and twitter streaming api to fetch the popular ten hash tags in
* the last 60 seconds and also does the sentiment analysis on the tweets using stanford nlp library.
*
*/
object TwitterStreaming {

val appConf = ConfigFactory.load();

/*
* Method sets the system properties for twitter api.
*/
def setTweeterSystemProperties() {
val property = new SystemProperties()
property.put("twitter4j.oauth.consumerKey", appConf.getString("twitter4j.oauth.consumerKey"))
property.put("twitter4j.oauth.consumerSecret", appConf.getString("twitter4j.oauth.consumerSecret"))
property.put("twitter4j.oauth.accessToken", appConf.getString("twitter4j.oauth.accessToken"))
property.put("twitter4j.oauth.accessTokenSecret", appConf.getString("twitter4j.oauth.accessTokenSecret"))
}

/*
* Method creates the properties object by setting the necessary configuration parameters for connecting
* to apache kafka cluster.
*/
def getKafkaProperties(): Properties = {
val props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, appConf.getString("bootstrap.servers.config"))
props.put(ProducerConfig.CLIENT_ID_CONFIG, appConf.getString("client.id.config"))
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, appConf.getString("key.serializer.class.config"))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, appConf.getString("value.serializer.class.config"))

return props
}

/*
* Method where the main code starts.
*/
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(appConf.getString("application.name")).setMaster(appConf.getString("master.mode"))
val sc = new SparkContext(conf)

sc.setLogLevel(appConf.getString("log.level"))

setTweeterSystemProperties();

val ssc = new StreamingContext(sc, Seconds(5))

val stream = TwitterUtils.createStream(ssc,None)

val prodcer = new KafkaTweetProducer(getKafkaProperties())

//Fetching the tweets from the twitter.
val tweets = stream.window(Seconds(60)).map(tweet => tweet);

//Fetching the popular ten hash tags along with count in the last 60 seconds.
val topCounts60 = tweets.flatMap(tweet => tweet.getText.split(" ").filter(_.startsWith("#"))).map((_, 1)).reduceByKey(_ + _).map{case (tweet, count) => (count, tweet)}
.transform(_.sortByKey(false))

//Sending the popular ten hash tags(json converted string) to kafka topic.
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
var str = "{\"tweets\":["
if(topList.length > 0) {
topList.foreach{case (count, tag) => str = str + "{\"tag\":\"%s\", \"count\":%d},".format(tag, count)}
prodcer.sendMessage(appConf.getString("twitter.streaming.topic"), str.substring(0, str.length()-1) + "]}")
}
})

//Doing sentiment analysis on the tweets in the last 60 seconds.
val data = tweets.filter(tweet => tweet.getText.contains("#")).map(tweet =>{
val sentiment = detectSentiment(tweet.getText)
(tweet.getText, sentiment)
})

//Sending the analysed tweet along with sentiment to kafka topic.
data.foreachRDD(rdd => {
val topList = rdd.collect()
topList.foreach{case (text, sentiment) =>
val str = "{\"tweet\":\"%s\", \"sentiment\":\"%s\"}".format(text, sentiment).toString()
prodcer.sendMessage(appConf.getString("twitter.sentiment.topic"), str)}
})


ssc.start()

ssc.awaitTermination()


}
}
Loading