Kevin Haas

Analyzing #PWorld15 Tweets in Real Time

Blog Post created by Kevin Haas Employee on Dec 15, 2017

This post originally published by Chris Deptula on Tuesday, October 27, 2015


I recently attended the Strata-HadoopWorld conference in NYC.  I have been attending this conference for the past few years and each year a theme emerges.  A few years ago it was SQL on Hadoop, last year was all Spark.  This year there was a lot of buzz about streaming and continuous data ingestion. Although often presented as different topics, the line between these concepts is blurring. 


To simplify, streaming means working with queues of events that are constantly flowing via a messaging system. Continuous ingestion is used more broadly and is the process of continuously ingesting from any data source, whether that be processing a file as soon as it is received, monitoring the database for new inserts, or streaming from a message queue.


One of the challenges with continuous ingestion is making data immediately available for analytics in a performant manner. For smaller amounts of data this can often be done using RDBMS’; however, for Big Data more complexity is required. Cloudera is trying to solve some of these challenges with Kudu; however, Kudu is still in beta and should not be used in production. Shortly before Strata-HadoopWorld, I started working on a demo for PentahoWorld using an architecture that solves this problem.


Before we go any further, let’s take a step back and explain the use case for our Pentaho World demo of real time analytics. I wanted to build a system that ingested a stream of tweets about the conference in real time while also making them available for analytics. The analytics are presented using a Pentaho CDE dashboard and Mondrian cube for slicing and dicing in Pentaho Analyzer. For the dashboard I performed a bit of Natural Language Processing on the tweets to determine their sentiment, and then made both aggregate metrics and individual tweets available as soon as the tweet was received. For the Pentaho Analyzer slice and dice capabilities, I allowed the data to be slightly older (not real time), but did update the underlying cube frequently.


How did I do this? At a high level I built a Pentaho Data Integration (PDI) transformation that acted as a daemon. This transformation used the Twitter Streaming API to get a continuous stream of tweets with the #PWorld15 hashtag, and does not stop processing new data until the transformation is manually stopped. I then used the Stanford Sentiment Analysis plugin for PDI to calculate the Wilson Sentiment of the tweet, before writing this data into MongoDB.


Once the data was in MongoDB, I built my Pentaho CDE dashboard. Pentaho CDE has the ability to source data from PDI transformations.  I built a few PDI transformations that used the MongoDB input step with Aggregate Pipelines to calculate the metrics I needed. Using Pentaho transformations as the input saved me from having to write a lot of JavaScript. Then it was simply a matter of telling the CDE components to refresh every 10 seconds and I had built my dashboard!


For the cube, Pentaho Analyzer requires a SQL interface to the data. It is true, that this could be solved by using PDI’s data service capability, which allows you to use SQL to query the results of a PDI transformation. However, the “PDI as a data service” option has a limitation in that the output data from the transformation must be able to fit in memory -- not practical for Big Data. Instead, I extended the transformation that I used to stream the data from Twitter into MongoDB to also write the data to Hadoop/HDFS. This enabled me to expose the data via Impala into the SQL interface Pentaho Analyzer requires.


Brilliant!  But, there was a catch. In Hadoop, a file is not readable until the file handle has been closed. If I never close the file because my transformation is constantly running, then the data would never be available to query. I could have written a file for every record, but this would have resulted in way too many small files and would have been detrimental to performance. Instead, I used the Transformation Executor step in PDI to group the tweets into 10 minute chunks, and then wrote one file every 10 minutes.


Once I had all of this figured out and working, I tested and encountered yet another problem. Tweets have new lines and lots of special characters. Storing the data in a plain text file was going to be a challenge. The good news is there are better file formats for Hadoop than plain text files, such as Avro and Parquet. So, instead of using text files, I used the Pentaho Consulting Labs Parquet Output step. This had the dual benefit of easily handling new lines and special characters while also improving the underlying performance of Cloudera Impala queries.


Finally, I created a few Impala tables and a Mondrian cube, and solved my problem. I was able to stream tweets in real time through Pentaho to make them immediately available in a dashboard, and, with a modest delay, also have them available for slice and dice analytics.