Software Development

Spark Streaming and Twitter Sentiment Analysis

This blog post is the result of my efforts to show to a coworker how to get the insights he needed by using the streaming capabilities and concise API of Apache Spark. In this blog post, you’ll learn how to do some simple, yet very interesting analytics that will help you solve real problems by analyzing specific areas of a social network.

Using a subset of a Twitter stream was the perfect choice to use in this demonstration, since it had everything we needed: an endless and continuous data source that was ready to be explored.

Spark Streaming, Minimized

Spark Streaming is very well explained here and in chapter 6 of the ebook “Getting Started with Apache Spark,” so we are going to skip some of the details about the Streaming API and move on to setting up our app.

Setting up Our App

Let’s see how to prepare our app before doing anything else.

val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc, Seconds(5))

System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

val stream = TwitterUtils.createStream(ssc, None)

Here we have created the Spark Context sc, and set the log level to WARN to eliminate the noisy log Spark generates. We also created a Streaming Context ssc using sc. Then we set up our Twitter credentials (before doing this we needed to follow these steps) that we got from the Twitter website. Now the real fun starts.

What is Trending Right Now on Twitter?

It is easy to find out what is trending on Twitter at any given moment; it is just a matter of counting the appearances of each tag on the stream. Let’s see how Spark allows us to do this operation.

val tags = stream.flatMap { status =>
   status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
   .foreachRDD { rdd =>
       val now = org.joda.time.DateTime.now()
       rdd
         .sortBy(_._2)
         .map(x => (x, now))
         .saveAsTextFile(s"~/twitter/$now")
     }

First, we got the tags from the Tweets, counted how many times it (a tag) appeared, and sorted them by the count. After that, we persisted the result in order to point Splunk (or any other tool for that matter) to it. We could build some interesting dashboards using this information in order to track the most trending hashtags. Based on this information, my coworker could create campaigns and use these popular tags to attract a bigger audience.

Analyzing Tweets

Now we want to add functionality to get an overall opinion of what people think about a set of topics. For the sake of this example, let’s say that we want to know the sentiment of Tweets about Big Data and Food, two very unrelated topics.

There are several APIs for analyzing sentiments from Tweets, but we are going to use an interesting library from The Stanford Natural Language Processing Group in order extract the corresponding sentiments.

In our build.sbt file we need to add the corresponding dependencies.

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

Now, we need to select only those Tweets we really care about by filtering the stream using certain hashtag (#). This filtering is quite easy, thanks to a unified Spark API.

Let’s see how.

val tweets = stream.filter {t =>
     val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
     tags.contains("#bigdata") && tags.contains("#food")
   }

Here, we get all tags in each Tweet, checking that it has been tagged with#bigdata and #food.

Once we have our Tweets, extracting the corresponding sentiment is quite easy. Let’s define a function that extracts the sentiment from the Tweet’s content so we can plug it in in our pipeline.

def detectSentiment(message: String): SENTIMENT_TYPE

We are going to use this function, assuming it does what it should, and we will put its implementation at the end, since it’s not the focus of this post. In order to get an idea of how it works, let’s build some tests around it.

it("should detect not understood sentiment") {
     detectSentiment("") should equal (NOT_UNDERSTOOD)
}

it("should detect a negative sentiment") {
     detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}

it("should detect a neutral sentiment") {
     detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}

it("should detect a positive sentiment") {
     detectSentiment("It was a nice experience.") should equal (POSITIVE)
}

it("should detect a very positive sentiment") {
     detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

These tests should be enough to show how detectSentiment works.

Let’s see an example.

val data = tweets.map { status =>
   val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
   val tags = status.getHashtagEntities.map(_.getText.toLowerCase)

   (status.getText, sentiment.toString, tags)
}

data represents a DStream of Tweets we want, the associated sentiment, and the hashtags within the Tweet (here we should find the tags we used to filter).

SQL Interoperability

Now we want to cross reference the sentiment data with an external dataset that we can query using SQL. For my coworker, it makes a lot of sense to be able to join the Twitter stream with his other dataset.

Let’s take a look at how we could achieve this.

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

data.foreachRDD { rdd =>
   rdd.toDF().registerTempTable("sentiments")
}

We have transformed our stream into a different representation (a DataFrame), which is also backed by all Spark concepts (resilient, distributed, very fast) and exposed it as a table so my coworker can use his beloved SQL to query different sources.

The table sentiment (that we defined from our DataFrame) will be queried as any other table in his system. Another possibility is that we could query other data sources (Cassandra, Xmls, or our own binary formatted files) using Spark SQL and cross them with the stream.

You can find out more information about this topic here and here.

An example of querying a DataFrame is shown next.

sqlContext.sql("select * from sentiments").show()

Windowed Operations

Spark Streaming has the ability to look back in the stream, a functionality most streaming engines lack (if they do have this functionality, it’s very hard to implement).

In order to implement a windowed operation, you’ll need to checkpoint the stream, but this is an easy task. You’ll find more information about this here.

Here’s a small example of this kind of operation:

tags    .window(Minutes(1))    . (...)

Conclusion

Even though our examples are quite simple, we were able to solve a real life problem using Spark. We now have the ability to identify trending topics on Twitter, which helps us both target and increase our audience. At the same time, we are able to access different data sets using a single set of tools such as SQL.

Very interesting results came back from #bigdata and #food at the same time. Perhaps people Tweet about big data at lunch time—who knows?

Reference: Spark Streaming and Twitter Sentiment Analysis from our JCG partner Chase Hooley at the Mapr blog.

Nicolas A Perez

Nicolas is a software engineer at IPC, an independent SUBWAY® franchisee-owned and operated purchasing cooperative, where I work on their Big Data Platform. Very interested in Apache Spark, Hadoop, distributed systems, algorithms, and functional programming, especially in the Scala programming language.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button