Student Project

Big Data Analysis of the 2016 Presidential Candidates

1.Introduction

This project uses various Big Data techniques to analyze characteristics of the 2016 Presidential candidates using batch and realtime data processing scenarios. We chose four candidates for our analysis: Donald Trump, Hillary Clinton, Ted Cruz and Bernie Sanders. The entire Reddit corpus from October 2007 through August 2015 was used to evaluate various characteristics of the candidates including post volume over time, the most popular keywords, parts of speech that describe the candidates and sentiment. The intent was to evaluate the rise or decline of these candidates and to tease out insights. We also chose to process Twitter data to evaluate sentiment of Donald Trump in particular given the rise in interest as a candidate and competing views.

We selected the politics domain given the high volume of data related to the 2016 election and curiosity to explain how these candidates may differ. Our goal for the project was to answer a few primary questions of four presidential candidates:


● Which Presidential candidates have gained interest over the last few years and within the last few weeks? This may give us insight into candidates where support and interest is increasing or decreasing.


● What is the sentiment for the Presidential candidates on social media (Twitter)? Is support improving or not?


● What are the most popular keywords and part of speech tags that characterize each candidate? This may give us insight into key themes or characteristics.


2. Architecture


The overall architecture is composed of batch and speed layer to enable the data analysis scenarios.

Figure 1 Architecture

Batch Layer

The batch layer is powered by a fournode IBM Softlayer cluster in the Dallas05 data center. It includes the following components:


IBM Softlayer 4 node cluster with one headnode and 3 workers. All nodes are configured the same: Ubuntu 14.04, 32GB RAM, 16 cores, 5.7 TiB HDFS

Cloudera Hadoop Distribution Hadoop components including HDFS, Spark, YARN were installed using the Cloudera Hadoop Distribution (Enterprise). CDH was installed on the headnode using the Cloudera Quickstart instructions . Cloudera Manager is the portal for managing the Hadoop services and is accessible at:

http://169.55.6.180:7180/cmf/login using username: admin, password: XXXX

Anaconda Cluster Management analyses were performed using pure Python and the Anaconda Cluster Management tool was used to install Python and Python packages on the distributed cluster. Packages were deployed from a MacBook. Additional configuration details are available in the Appendix.


Python libraries the following Python thirdparty liibraries were used:


hdfs3 Python connector to HDFS
dask, distributed a distributed processing framework on Hadoop using Python
( see appendix for monitoring user interface )
Bokeh interactive visualization
○ Pyspark Spark programming interface
NLTK natural language processing
Matplotlib charting

Amazon S3 the Reddit JSON files (1TB) were downloaded from the Amazon S3 object
store onto the cluster. The data transfer completed in approximately two hours.


Dask Distributed Processing Framework Dask is a parallel processing framework using pure Python on top of the Hadoop stack. The Dask status interface is accessible at:

http://169.55.6.180:8787/status

Jupyter Notebook a Jupyter notebook was used to manage the Python code and connect to the headnode for execution. The Jupyter server was installed and hosted on the headnode. The notebooks at accessible at: https://169.55.6.180:8889/tree 

Speed Layer


There are two data processing pipelines within the speed layer. The first is a pipeline to evaluate live Twitter data on dashboard for a day and the second is a 2 week sentiment analysis pipeline both analyze twitter related to different candidates to get current trends and sentiments.

The dashboard speed layer includes following components:


Elasticsearch creates the index of the tweeter feed and later used by kibana to display and analyze.
Logstash input configured for getting tweeter data continuously
Kibana generate display and analysis dashboard

The Live twitter data dashboard allows for analysis of any signal in user sentiment related to ongoing events and candidate popularity.The ELK stack is installed in two different servers 50.97.205.234 and 198.11.220.112 , where each is dedicated to a candidate we are monitoring.


3. Batch Data Processing Scenarios

The objective of the batch data processing scenario was to generate insights about the
presidential candidates by mining the 1TB Reddit JSON data. The insights were generated
using Spark and a pure Python approach (Dask) to compare and contrast performance and
code implementation. The Jupyter notebook can be found on Github and the Juypter notebook
server dashboard is accessible at https://169.55.6.180:8889/



Data Transfer from Amazon S3


The 1TB of Reddit JSON data was downloaded from Amazon S3 to the Softlayer cluster using
the following command:

hadoop distcp s3n://AWS_ACCESS:AWS_SECRET@blazedata/
reddit/json/*/*.json /user/root


The data set is composed of one JSON file for each month. They are stored in the /user/root folder on HDFS ( See appendix )
 

JSON Data Processing

The approach for data processing for both Spark and Dask were   essentially identical.The 1TB data set was first loaded into a Spark RDD and Dask Bag respectively. The objects were then reduced to the subreddit “politics” so that the data analysis was focused on the political domain. These objects were persisted and then further reduced by each presidential candidate during the analysis sections described below. We found that RDD transformations (filtering) and actions (e.g.counts) consumed substantial memory. For example, the analysis below required more than 12 hours to process the entire 1TB data set. We experimented with persisting the RDD’s but this created memory errors.

Figure 2 Data Processing Performance

We concluded that increasing the cluster size, increasing RAM, preprocessing the JSON files
and persisting the RDD’s and Dask Bag objects in memory would have improved performance
considerably. We used the Dask user interface to monitor performance and graph execution
for gaining insight into optimizations.

Volume Analysis

We were curious to explore the Reddit data to see how the volume of each presidential candidate trended over the last 78 years. Figure 3 displays the number of posts that included the candidate name in the post body for each year from 2007 2015.Note that 2007 (October-December) and 2015 (January-August) contained posts for only part of the year.


Figure 3 Subreddit volume for each candidate (Oct 2007 Aug 2015)

Figure 4: Year over Year volume

Overall the volume of posts increased significantly leading into the 2016 election year. The
number of posts including Hillary Clinton has remained fairly constant over the last few years
while Donald Trump and Bernie Sanders surged 1917% and 567% respectively from 2014 to
2015. These data suggest a growing interest in both of the candidates when compared to
Hillary Clinton and Ted Cruz. Somewhat surprising is that Bernie Sanders was referenced in
48% of the 2015 posts and over 38% of the posts from 2007-2015.

The 2015 surge suggests a growing interest as a viable presidential candidate (blue line). Ted Cruz appeared on Reddit radar starting in 2012 but at a slower rise when compared to Sanders, Clinton and Trump.

 

Figure 5: Candidate Comparison

Top Keyword Analysis

Analyzing the top keywords within the posts that referenced each presidential candidate may
provide some insights into themes or characteristics of the candidates. The “body” attribute of
the JSON was processed using the Natural Language Toolkit (NLTK) Python library to tokenize
the words and calculate the frequencies. The processing produced the top 15 keywords for
each candidate:

Figure 6: Top Keyword Analysis

 

Overall the analysis did not reveal deep insights given that many common words showed up in
the top 15 other than the candidates. It was not surprising that “money” was a top keyword for
Trump while we found it interesting that “Obama” was a keyword for all candidates except
Sanders. Extending the analysis for more keywords may have provided additional insights.
 

Part of Speech Analysis

We also used the NLTK to mine the parts of speech including nouns and adjectives to give us
further insight into themes or characteristics for each candidate. The part of speech processing
produced the the top 15 nouns and adjectives for each candidate:
 

Figure 7: Part of Speech Analysis


● The nouns for Donald Trump suggest a focus on business, money and taxes. Profanity
may suggest a strong opposition from the general public.
● The nouns for Hillary Clinton suggest that war is a theme given her involvement in recent
wars including Libya, Iraq and the Taliban. The analysis of adjectives did not generate
any conclusive insights other than a “running” theme of whether she was going to run for
the presidency.
● Profanity appeared in posts for all candidates except Hillary Clinton.


The part of speech analysis delivered limited insights and we concluded that analysis of more
than 15 keywords is likely required as well as filtering common words that appeared across the
candidates (e.g., candidate, government, election).

4. RealTime Analytics


One of the speed layer implementation we did was to capture the snapshot of the current data
on tweeters for two candidates ( Hillary and Donald Trump) . We implemented the real time
visualization of tweeter data using ELK stack consisting of the following

1. ElasticSearch ( elasticsearch2.3.1)


Elasticsearch is a distributed open source search engine based on Apache Lucene, and
released under an Apache 2.0 license. It provides horizontal scalability, reliability, and multitenant
capability for realtime search.

2. Logstash (logstash1.5.4)

Logstash is a data pipeline that helps collect, parse, and analyze a large variety of structured and unstructured data and events generated across various systems. It provides plugins to connect to various types of input sources and platforms, and is designed to efficiently process logs, events, and unstructured data sources for distribution into a variety of outputs with the use of its output plugins.

3. Kibana ( kibana4.1.1)

Kibana is an open source Apache 2.0 licensed data visualization platform that helps in
visualizing any kind of structured and unstructured data stored in Elasticsearch indexes.
Setup
We put together two softlayer servers,. each with 16G memory and 100G disk space and
installed the ELK stack on both. This allowed us to monitor the two candidates separately and
analyze the data independently.


Logstash configuration


Logstash was configured to capture and filter tweets for a candidate. Logstash is than run using the configuration file to get the input data from tweeter and the output is directed to the elasticsearch .
 

Table 1: Logstash Configuration file

input {


twitter {
consumer_key => "XXXXXX"
consumer_secret => "XXXXX"
oauth_token => "XXXXXXX"
oauth_token_secret => "XXXXXXX"
keywords => ["#hillary"]
full_tweet => "true"
}

}


output {

elasticsearch {
protocol => "http"
host => "localhost"
port => "9200"
index => "twitter"
document_type => "realtime"
}

}

 


Elasticsearch Configuration


Elasticsearch is configured to run on port 9200 and accept the logstash file for indexing.
Kibana Setup and Dashboard creation We created following visualization for each candidate on each server:


1. Popular Hashtags ( Top 10 )
2. Top Tweeted User ( Top 10)
3. Metrics:

 

  • Tweeter Count
  • Unique count of entities.hashtags
  • Unique user.screen_name
  • Unique count of user.time zone


Visualization on the dashboard captures the most popular hastags related to a candidate on that
day. For example, tweets during New York Primary day on April 19th,2016 had following top 5
hashtags #trump2016, #nyvalues, #nyprimary, #primaryday, #ny and for hillary clinton it
included #transcript, #vote, #nyprimary #bern,#imwithher


ELK Data Pipeline

Figure 8: ELK data Pipeline


In a typical ELK stack data pipeline , multiple application server outputs are shipped through
logstash shipper to a centralized logstash indexer. In our example we used twitter as input to
the logstash and set up the logstash and indexer on the same server. Logstash indexer outputs
the data to elasticsearch cluster on the same server machine and this is queried by the Kibana
to display visualizations and build dashboards.



Visualization DashBoards​

We implemented the two dashboards , one for Donald Trump and one for Hillary Clinton which
depicts the current trends about the candidate on tweeterverse . The time line can be configured
for 1 day , 1 hour or 15 minutes depending on where we want to narrow our focus for analysis.

 
Figure 9: Live Visualization Dashboard
  • Dashboard for Hillary Clinton

http://50.97.205.234:5601/#/dashboard/AllaboutHilarytweet

  • Dashboard of Donald Trump

http://198.11.220.112:5601/#/dashboard/AllAboutTrumpTweets

The same concept can be extended to monitor any number of candidates

 

5. Twitter Sentiment Analysis


Overview


In this part of the project, we use Spark, Hadoop, S3 to find out various aggregations about the tweets posted about 'donald trump'. We use spark to collect and process tweets and store the output to locally attached HDFS disks. We periodically move the output files to S3. Finally, we use this data from S3 to create various visualization and for further aggregation using python.

Here, we use the above technologies to search twitter for tweets about 'donald trump'. Using these tweets, we find out the top 10 keywords used in these tweets every hour. We also find out the place each tweet corresponds and we aggregate by the places. We also create a plot of the number of tweets gathered every hour.

Cluster setup

The cluster was setup using 4 softlayer virtual servers, each with 16GB of RAM, 4 vCPUs and two disks of 100GB each. One disk each of the 4 nodes was formatted to HDFS. Then spark cluster was created on top of this hadoop installation. Stock sources were used and spark andhadoop were installed separately.

SPARK Implementation

Spark was installed on top of hadoop and hadoop environment variables were created and
supplied to spark. This way the default output location of the spark program was set to the
HDFS disk. The spark program was created using scala.

 

Figure 10:  Spark Streaming Overview


For removing stopwords, we use a custom english stopword list taken fromhttp://www.ranks.nl/stopwords . We add this list to HDFS so that it's visible by all the nodes.Using batch size of 1 seconds, that is, tweets are gathered every second, and sliding window duration of 1 hour and analysis window duration of 1 hour, we perform our analysis. Since the analysis window duration and sliding window duration are same, spark does not use any data from the previous analysis window.

Since we are dumping data every hour and not using it in memory, this saves RAM and we can
use the aggregated data in a different program for further analysis.

Figure 11: Spark Streaming Sliding Windows

 


For collecting tweets, we use the builtin twitter4j library. The library provides various functions to extract fields from the json returned by the twitter streaming api. Here we extract tweet text, hashtags, keywords, and the places the tweets correspond to. Keywords are the words in the tweet text which are not in the stopwords list.

After the fields are extracted, we perform aggregation on each field and dump the results to
HDFS. In our case, we have performed simple counting aggregation.After the data was dumped, we use alchemyAPI to perform sentiment analysis on top 40 tweets obtained in the current analysis window. We also find the number of unique tweets, hashtags etc. We write all this output to stdout.


A separate bash script is used to run the program and provide various parameters to the program. IO redirection is used to append out the output from stdout to local file 'count_and_sentiments.txt' and write the error and info from stderr to another local file 'error.log' We use the 'error.log' file to debug any errors encountered during the execution of the script. The HDFS files are uploaded to s3 using distcp command in hadoop. And the local file is uploaded is using the AWS CLI.

Python Implementation
 

IPython notebook is used for creating timeseries plots and to find out top keywords from the output of spark program. For this python program, we used the uploaded S3 data as input. Using boto, we gather all keyword files from S3 and group them by the dates and print out the top 10 keywords in the tweets. By using the tweets and their counts we also get the total count of collected tweets every hour.Finally, by parsing the 'count_and_sentiments.txt' output file, we plot average sentiment of top 40 tweets every hour.

Results


Figures 12: References to Donald Trump

 


 

Figure 13: Sentiment Analysis Time Series


Most Positive Hour For Trump’s Twitter Sentiments


Apr 14 3pm-4pm
Overall average sentiment: 0.357062

Table 2: Top tweeter Keywords
Top Keywords Top Hashtags
vote 1228
cruz 878
campaign 621
people 530
watch 504
#Trump 1875
#Trump2016 787
#TrumpTrain 438
#NYPrimary 431
#trump 336



Most Negative Hour For Trump’s Twitter Sentiments
 

Apr 17 1am-2am
Overall average sentiment: 0.245588

Table 3: Top tweeter Keywords

Top keywords and Counts Top hashtags and Counts
infowars 656
delegates 464
vote 461
cruz 405
video 396
#Trump 1192
#Trump2016 268
#trump 218
#ColoradoProtest 182
#TRUMP 159



6. Conclusions and Future Work

Reflecting our project work we concluded that we expended significant time on the infrastructure setup and operationalizing the data processing pipelines when compared to generating deep insights. 

The learning was extensive and with a few more weeks we believe we could have delivered additional insights. We also concluded that the project scope was broad from batch big data processing to realtime analytics and sentiment of Twitter streams. Each team member delivered was focused on one of the three capabilities and with more time we would have likely integrated these together on one infrastructure and user experience.

Additional conclusions:


Batch Layer


● 1 TB Reddit data processing consumed significant memory for both Spark and Dask. Recommendations include increasing cluster RAM, adding nodes to distribute processing, modularizing code, persisting objects in memory, and preprocessing the JSON files offline.


● Cloudera was a very efficient method for installing/modifying Hadoop and monitoring performance, health, services. We found it useful to monitor loads when executing intensive data processing code.


● Dask provided a comparable approach to Spark for distributed processing using pure Python. Anaconda Cluster Management significantly simplified Python library deployment across all nodes of cluster from a client machine (MacBook)


● Keywork and part of speech natural language processing provided limited insights about each candidate. This is an area that would require more investment to improve results including filtering common words and increasing the number of keywords for analysis. It may be also interesting to performance sentiment analysis on the top keywords.


Speed Layer


Data Insights:
1. Noticed a major spike in sentiments when the program was running on April 14th. This was a day after the Town Hall Debate was conducted.


2. Trump’s campaign experienced a major dip in sentiment on April 17. This may have to do with negative news stories from InfoWars (hypothesizing based on corresponding hashtags)


Architectural Insights:
3. Spark cluster failed twice because of memory issues resulting from poorly designed garbage collection in Scala code. In future, scala code written for this could be improved for better garbage collection.
 

Appendix


Anaconda Cluster Management Configuration


The “Bare Metal” installation was used to install Anaconda Cluster Management given that Cloudera and Softlayer were used to create the 4node cluster. The following configuration files were used to deploy Anaconda to the cluster.

 

$ cat ~/.acluster/profiles.d/baremetal.
yaml
name: baremetal
provider: bare_metal
node_id: bare_metal
node_type: bare_metal
user: root
num_nodes: 4
machines:
head:
169.55.6.180
compute:
169.55.6.169
169.44.57.137
169.44.83.67

$ cat ~/.acluster/providers.yaml
bare_metal:
cloud_provider: none
private_key: ~/.ssh/mids251project


 


IBM Softlayer Virtual Servers for Batch Data Processing

 

Figure 14: Cluster Manager

Note that the etc/hosts file must contain the private IP addresses for the Cloudera Hadoop
installation.

10.142.194.32 node1.austin.com node1
10.142.194.42 node2.austin.com node2
10.155.101.160 node3.austin.com node3
10.120.212.6 node4.austin.com node4

The Ubuntu firewall should also be turned off temporarily to enable all ports and protocols for
the Cloudera installation.
 

Cloudera Hadoop Distribution and Cloudera Manager
 

Figure 15 :  Cloudera Hadoop Distribution  and Cloudera Manager

SPARK Configuration for Streaming Data Processing

Figure 16: Spark Streaming Data Processing


Dask status user interface 

Figure 17: Dask status interface (http://169.55.6.180:8787/status)


References
 

  1. Reddit JSON data structure: https://github.com/reddit/reddit/wiki/JSON
  2. Anaconda for Cluster Management: http://docs.continuum.io/anacondacluster/installation
  3. Dask: http://dask.pydata.org/en/latest/
  4. Distributed: http://distributed.readthedocs.org/en/latest/
  5. hdfs3 (HDFS): http://hdfs3.readthedocs.org/en/latest/
  6. Cloudera quickstart: http://www.cloudera.com/documentation/enterprise/latest/topics/cm_qs_qui...
  7. GitHub project repository:
    1. https://github.com/jamesgray007/BerkeleyW251Project
    2. http://danielfrg.com/blog/2015/07/21/reproduceitredditwordcountdask/
  8. Jupyter Notebook server installation:http://jupyternotebook.readthedocs.org/en/latest/public_server.html
  9. Logstash deep dive:https://www.elastic.co/elasticon/conf/2016/sf/divedeepwithlogstashfrompi...
  10. Elasticsearch guide: https://www.elastic.co/guide/index.html
w251-project-figure0.allcandidates.png
Architecture Overview
Architecture Overview
w251-project-figure3.png
Candidate Comparison
Candidate Comparison
ELK Stack - Data Pipeline
ELK Stack - Data Pipeline
Realtime Visualization Dashboard
Realtime Visualization Dashboard
Spark Streaming Architecture
Spark Streaming Architecture
spark streming Sliding window
spark streming Sliding window
spark streaming - Donald Trump reference
spark streaming - Donald Trump reference
spark streaming and Sentiment Analysis
spark streaming and Sentiment Analysis
apendix Cloudera Manager
apendix Cloudera Manager
appendix Cloudera Manager -II
appendix Cloudera Manager -II
Dask Dashboard
Dask Dashboard
w251-project-figure16-appendix2-spark-streaming.png

Last updated:

October 7, 2016