Category Archives: Rittman Mead

Rittman Mead at BIWA Summit 2017

I'm excited to be attending my first ever BIWA Summit next week (which will take me to Oracle HQ at Redwood Shores for the first time too!). This three day conference is one of the major dates in the conference calendar for all Oracle Analytics folk, and I'm proud to have opportunity to present three papers:

  • Analysing the Panama Papers with Oracle Big Data Spatial and Graph

    31st January, Room 103, 15:45

    Based on an article I wrote recently, I'll be talking about how to use property graph analysis through Oracle's Big Data Spatial and Graph tool to examine and analyse the relationships in the Panama Papers dataset. Complex relationships that would be all but impossible to query in relational SQL can be uncovered using built in algorithms as well as with Property Graph Query Language (PGQL). I'm using my new favourite tool, interactive notebooks, to demonstrate PGQL as well as the PGX interface.

  • Kafka's Role in Implementing Oracle's Big Data Reference Architecture on the Big Data Appliance

    1st February, Room 102, 14:20

    Apache Kafka is rapidly becoming accepted as a de-facto means of building a data pipeline through a business, ensuring availability of data to and from all systems that need it. In this presentation I go in to the detail of what Apache Kafka is, the problems that it solves - and then put this in context of the Oracle Information Management and Big Data Reference Architecture.

  • (Still) No Silver Bullets : OBIEE 12c Performance in the Real World

    2nd February, Room 203, 13:30

    One of my favourite presentations to deliver, this dives into what you should - and shouldn't - do when building an OBIEE system. It explains how to troubleshoot performance issues methodically - and not a best practice in sight!

There's a full listing of all sessions here, with a PDF to download here.

You can follow the conference proceedings on twitter with the hashtag #BIWASummit, and I'll be tweeting about it to as @rmoff. The presentations that I'm delivering will be available to download on speakerdeck.

Race Against the Machine – Skilling in the Name

Firstly apologies for the awful title. We’ll see by the end of this post whether the pun works!

At Rittman Mead we see the Data & Analytics market and indeed the broader technology market continually changing.

Investment in technology to move organisations ahead of their competitors seems higher than ever and gone are the days that large IT projects are seen as purely a huge cost to the business.

They are getting genuinely measurable ROI now.

We’ve also observed an ever changing landscape in terms of the new features and functionalities of tools. It makes us wonder if we’re in a situation where these systems are going to end up way ahead of a person's ability to use it to it’s best potential.

Take the Hadoop Ecosystem for example. Every large organisation in the world is starting to take Big Data seriously, however barely anyone had even seen the different tools surrounding it until a few years ago.

That’s why we believe that it’s everyone’s responsibility to put Learning & Education towards the highest end of their priority lists at the start of each year.

  • That goes for course developers & deliverers such as Rittman Mead, Oracle and Cloudera to name a few. The onus is on us to provide the necessary learning opportunities and courses.

  • It also goes for organisations who must invest in their staff through training and education. The benefits to companies are huge. It shows that level of commitment to your staff which can lower attrition and increase productivity.

  • And it certainly goes for anyone in the technology space to constantly pick up new skills and experiences that will help them for years to come.

It would be mad for me to go out and buy a TaylorMade M1 golf club for £250 before I’ve learnt how to hit a golf ball straight (I really struggle with a slice!).

In the same respect, a company should never invest millions on a system and then fail to train it’s teams how to use it.

We’ll always be racing against machine but with the right learning perspective in place we can definitely all keep up.

Look out for our post next week when we review our Training in 2016 and take a look at what's on the horizon in 2017.

Time Series Visualisations: Kibana or Grafana?

Grafana has rapidly become one of the de-facto “DevOps” tools for real time monitoring dashboards of time series metrics. In addition to its powerful visualisations, Grafana is not tied to a particular stack or vendor, and supports multiple backend data sources including InfluxDB, Graphite, Elasticsearch and many others which can be added via plugins.

Another similar tool, Kibana is the data visualisation front end for the Elastic Stack, complementing the rest of the stack which includes Beats, Logstash (ingest) and Elasticsearch itself (storage). With the version 5.x release of the Elastic Stack, Kibana now includes Timelion for interactive time series charts.

Here at Rittman Mead we are big fans of both tools, and have written about them over the years (see 1, 2, 3). Our industry-leading Performance Analytics solution for OBIEE is built on top of these tools, and takes advantage of the time series features to provide interactive web-based dashboards presenting a “full stack" view of the important metrics relating to OBIEE's performance.

To give you an idea of what we’ve built, here is a sample dashboard from our Performance Analytics tool. We use both Grafana and Kibana, to present different views of data. The dense dashboards of time series metrics work brilliantly in Grafana:

To enable the user to view and analyse performance data across multiple dimensions we use Kibana, which does a stirling job:

With the recent release of Timelion - a time series visualisation plugin for Kibana - out of beta and into the big time, we wanted to ensure we were still using the right tool for the right job. Did we still need Grafana in our stack for visualisation of time series metrics, or could Timelion fill that gap now, and enable us to streamline our platform’s toolset?

In this article we’ll see how Timelion and Grafana stack up against each other. The intention is not to define which is “best” (a pointless exercise), nor create an unintelligible grid of down-in-the-weeds features that each may or may not support, but to see how the two tools compared in real-world usage, side by side. Which makes it easier to build charts? Which produces a nicer-looking dashboard at the end of it? Which has the best UI and UX for the end user reading and analysing the data? What limitations -if any- are there on data sources and functionality in analysing that data? And ultimately, can we unify our product’s front end on a single one of these tools?

Introduction to Timelion

Since version 5 of Kibana, Timelion (pronounced "Timeline") has been included as part of the default installation. Charts are defined using a bespoke query language, which specifies both the source of the data, functions to apply to it, and how it is presented. The query is specified in a textbox in the Timelion interface. In this simple chart here we’re using the expression .es(*) to show the total number of documents in Elasticsearch, over time:

Every Timelion expression starts with a data source function and continues with a chain of functions that are connected with a dot. Over 20 functions are provided, across three groups:

  1. Data sources - the default is Elasticsearch, and other APIs such as World Bank and Quandl are also available.
    For example in the graph above, the default expression .es(*) (similar to .elasticsearch(*)) shows a count of all documents in Elasticsearch. You can specify details of the Elasticsearch index, mappings and metrics here too, as well as filters.

  2. Data manipulations ranging from simple arithmetic to moving averages, cumulative sums and derivatives
    For example, adding a moving average to the data is as simple as including the function to the end of the expression: .es(*).movingaverage(12)

  3. Themes and styles of the visual elements including bar/point/lines, labels, title and legends. The graph below shows the number of running queries by time extracted from the active session history data in the Oracle database. .es(index=ash*).lines(1,fill=1).title('Running Queries').legend(none).label(false)

With regards to the available documentation and guides for the developers, the main documentation for Timelion is somewhat sparse. For details of each function you can refer to the documentation on github. Compared to the rest of the excellent Elastic documentation, this is surprising and hopefully now that Timelion is part of the core product its documentation will be brought up to parity - full explanations of features and functions along with examples of usage.

On the positive side, the query builder text box supports auto-complete of functions and their arguments, and the Timelion interface provides online help too. A downside to this minimalist Timelion page is the size of the expression textbox. As you will read more in this post, it wouldn’t take long before you need to add more than one metric and a few styles to a visualisation which means having too many words in the textbox that can’t be seen, scrolled and edited easily:

If you are a beginner, to avoid the confusion over typos and errors, try building the expressions step by step and add functions gradually. The blog here nicely explains how to gradually create Timelion expressions.

Of special note in the data manipulation functions that Timelion provides are the statistical analysis ones:

  • .trend() : add a trendline using a specified regression algorithm to your graph
  • .holt(): an early version of this function, which samples the beginning of a series and use it to forecast what should happen via several optional parameters.

These are useful for our performance monitoring dashboards, enabling us to show things such as the point at which you would run out of memory/disk space if you continued to consume resources at your current rate.

Related to this concept is Prelert, which Elastic acquired next year and is expected to be part of a future X-Pack release. Whilst dashboard-based analysis is useful, once a clear pattern on which we want to alert is identified we can bring in Watcher to provide real time notifications to pager systems etc.

Introduction to Grafana

Grafana is an open source feature rich dashboard and graph editor that is rapidly becoming accepted as one of the best time-series metric visualisation tools available. Grafana has gained its popularity thanks to its simplicity, ease of use and snazzy look and feel that attracts many users. You can read more about Grafana in an earlier article that we wrote on the Rittman Mead blog here. Here is the kind of dashboard you can easily build with Grafana:

Most of the configurations in Grafana are done via a comprehensive graph editor interface:

In the Grafana editor queries are generally built entirely through the GUI. Manually specified queries are used in cases such as accessing advanced functionality, and for specifying Lucene queries for in order to access data held in Elasticsearch. In terms of support for Elasticsearch, the latest version of Grafana at the time of writing this post (v4.1.1) supports both Elasticsearch v2 and v5. From my time spent working with Grafana 4.1.1 and Elasticsearch v5 I haven’t found it to be as stable as the long-standing data sources such as InfluxDB and Graphite (or even Elasticsearch v2). As an example, if a chart is configured incorrectly (for example settings for null values), Grafana is not as intuitive in returning no results or throw a descriptive error explaining the issue; instead the graph seems locked and the only possible solution for this behaviour seems to be deleting the chart and recreating it from scratch.

As well as data sources the graph editor includes settings covering display styles such as titles, templates, axis and legends.

A interesting new addition to the Grafana family is the alerting engine which allows users to attach rules to the dashboard panels. Once dashboards are saved Grafana will extract the alert rules into a separate alert rule storage and schedule them for evaluation.

Side-by-Side : Presenting the Data

On the face of it, the output from Grafana and Timelion can be remarkably similar:

Grafana on the right and Timelion is the left graph

However, there are a few differences between the two tools that are worth digging into here. They are mainly on the display configuration part and simplicity of the user experience.

As mentioned, Grafana’s chart editor has a clear interface over the multitude of options available for refining the presentation of the data.

Timelion also supports chart formatting, but with fewer options than Grafana. It also depends on the user concatenating the correct functions onto the data query expression as we saw above. For example to add a graph that has a “Running Queries” title, a legend on the top right of the plot, not labeled axes and data shown with a 1px width line, you would need to hand-code the this expression: .lines(1,fill=1).title('Running Queries').legend(ne).label(false)

Grafana offers significantly greater flexibility in the formatting of the chart. One example is displaying metrics of different units such as time, currency and data. Grafana can automatically scale axes based on the units (bytes -> MB -> GB). The following Grafana graph shows disk usage from our monitored application stored in Elasticsearch. The disk usage metric on the Y axis is in Kilobytes, which Grafana has automagically scaled to the appropriate magnitude (MiB) in the labelling:

The same could be done manually in Timelion by specifying the appropriate conversion, but this is a hardcoded option compared to Grafana’s dynamic one, and even then wouldn’t have the varying labeling that Grafana does above (KiB initially, switching to MiB subsequently)

Grafana also supports the rendering of negative values on the Y axis, which is just not possible in Timelion. As well as genuinely negative data values (for example, temperature recordings below zero degrees), using transform feature of Grafana it is possible to invert particular series so as to aid the comprehension of the data as seen here:

Another nice feature that Grafana has - and unfortunately Timelion doesn’t - is the ability to show metric values in the legend itself. It’s a great way to see key values at a glance, without requiring a separate table or the user to hover over the data points.

Side-by-Side : Interacting with the Data

Grafana and Kibana are also different in terms of the level and ease with which it is possible to interact with the charted data. Both Kibana and Grafana support the drag-select of time periods on a chart to zoom into detail, with the rest of the charts on the same dashboard updating to show the same time period too. However, Kibana is much more feature-rich in this area. As a front end to Elasticsearch it supports ad-hoc text search of your data. It also allows users to automatically drill down into data, by clicking on a value in a chart to show details just for that. In the OBIEE monitoring dashboard below (built in Kibana), Active Session History data is filtered for the session_states in “Waiting” and “On CPU” - this filter was created by the user simply by clicking on the data points in one of the charts, and can be toggled dynamically from the same interface.

This interactivity is supported by Timelion too. The es() datasource function includes an argument called “kibana”. This argument defines whether the visualisation should follow the filters applied to the rest of the Kibana dashboard or not, for example:
.es(index=dms_*,metric='avg:obips1-Current_Disk_Usage',fit='nearest',kibana='true')

Whilst it is possible to specify Elasticsearch Lucene queries in Grafana and use term filters in the editor, these are local to the graph. With some use of variables it can be possible to enable a degree of global filtering on a single Grafana dashboard but this is a bespoke solution per-dashboard, rather than the out-of-the-box functionality that Kibana provides.

Grafana does enable you to toggle the display of data in a chart, by clicking on the measure label in the legend, seen above.

Conclusion

Comparing Kibana and Timelion to Grafana, it is true that they do a similar job displaying time series metrics - with pros and cons on each side.

Grafana’s graph editor offers an amazing interface with regards to the options available for refining the presentation of the data. Grafana is not only an straightforward development tool but also adds a huge amount of value to the resulting dashboards making them easier to read and analyse by the end users

On other hand, Timelion is just one of many visualisations that Kibana provides (including Tile Map and Tag Cloud), meaning that dashboards can be built which are less dense with numbers and time series but information is shown through variety of visualisations. Unfortunately Timelion and its expression editor at its current version seem slightly immature and relatively limited. A few more additional display options plus a nicer editor would put Timelion in a better position in comparison.

So, for now, we’ll be sticking with our dual approach of both Grafana and Kibana. Grafana provides our pure time-series metric dashboards, with the ease-of-building being one of the key factors, along with the rich formatting capabilities and its support for a data sources rather than Elasticsearch. Kibana does an unbeatable job of dashboards enabling rich exploration of metrics across dimensions, rendered in a greater number of possible visualisation forms. Timelion is a great first step, but ultimately just can’t compete with Grafana.

This is a fast-moving area of tool development, and you can bet that Grafana and Kibana are going to continue developing at a rate of knots - which as users and developers is great news!

Analyse Your Data on the Go with Oracle Synopsis App

Analyse Your Data on the Go with Oracle Synopsis App

How many times did you receive an email on your phone with an Excel or CSV attachment you wanted to analyse immediately in an app on your mobile, without having to wait until you reach the laptop? Not only viewing raw numbers but also creating graphs and summaries with the possibility to share the end result with your colleagues?

Analyse Your Data on the Go with Oracle Synopsis App

Your prayers have been answered with the recent release from Oracle of Oracle Synopsis. This is a new mobile app available for Android devices (an iOS version coming soon) that enables building data analyses on the go by interacting with data directly on the smartphone/tablet. It is a free application that doesn't require any OBIEE backend or additional licensing.



Oracle's mobile ecosystem so far has been represented by Oracle BI Mobile HD: an app available since several years, requiring an additional license, which focused on the visualization of OBIEE's pre-built content like dashboard, analysis or alerts. The main limitation of BI Mobile HD is that all content must be created upfront in a computer browser in order to be accessed by the application, no "analysis" option was available other than predefined drilling or navigation capabilities.

Synopsis extends Oracle's mobile ecosystem by adding an app capable of analysing data on the go, interacting directly with files on the phone/tablet in a visual and intuitive way.

Let’s have a look at how Synopsis works. For my test I'll use the Federal U.S. Electric Utility Companies and Rates data coming from en.openei.org.
I first downloaded the CSV file in my phone, then opened the Oracle Synopsis App and selected the file from the "Downloads" folder.

Analyse Your Data on the Go with Oracle Synopsis App

By default, just by opening the file with Synopsis app, I get a project named as the source file (iouzipcodes2011) showing a bar chart of ZIP by UTILITY_NAME. I'm able to switch the coordinates of the graph by either changing the dimension of the X-axis or the measure on the Y-axis by selecting another option from the sections on top and bottom of the graph respectively.

Analyse Your Data on the Go with Oracle Synopsis App

The default graphs are provided just by opening the file, without having to define any measure or dimension.

The question now is: What am I looking at? How do I change the default behaviour?
I can get an idea about what Synopsis is doing by clicking on the project title itself. By default the application associates all text columns to dimensions and all numeric columns to measures, aggregating them with SUM. I can however change the default behaviour by:

  • Changing the aggregation method for the measures, possible alternatives are Average and Count
  • Changing the text and numbers (dimension and metrics) assignments by clicking on the cog icon and accessing the related screen
  • Hide a column by just tapping on the column name (in the grey box shown in the image below), a line will be shown on top the hidden column name.

Analyse Your Data on the Go with Oracle Synopsis App

Measures format can be changed just by sliding the related number tile and setting decimals, currency, percentage format among others.

Analyse Your Data on the Go with Oracle Synopsis App

When clicking on a measure (e.g. ind_rate), a list of graphs, one per dimension, is presented. The type of graphs depends by the attribute type (bar for a text, line trend for a date) and cardinality (a donut is presented when the number of distinct value for an attribute is shown).

Analyse Your Data on the Go with Oracle Synopsis App

I can however change the graph type by clicking on it. There are multiple options:

  • One or more metrics can be added and filters can be applied
  • Attributes can be added and filters can be applied
  • The graph type can be changed

Analyse Your Data on the Go with Oracle Synopsis App

An image of the resulting graph can then be easily sent via mail by clicking on Analyse Your Data on the Go with Oracle Synopsis App

When clicking on padlock icon projects are secured with the fingertip sensor. This option is enabled only in mobile devices supporting the fingertip recognition.

Analyse Your Data on the Go with Oracle Synopsis App

Projects can be exported: a .syn file will be created containing both data and metadata. The project file can then be shared and reopened with the Synopsis app in other mobiles. In this first release is not possible to share projects between Synopsis and other tools like Visual Analyzer or Data Visualization Desktop but it’s an obvious enhancement that one could imagine Oracle considering for the future.

A list of global settings is also available in order to change import settings like blank cells management and CSV delimiter.

Analyse Your Data on the Go with Oracle Synopsis App

Be aware that Synopsis requires the Excel (or CSV) file to be in a pure tabular form; extra heading rows or collapsed cells will prevent Synopsis from parsing it. On top of this, in order for it to work at least one numeric and one text column are needed.

Conclusion

Oracle Synopsis is a great addition to Oracle's mobile offering. It provides an app capable of analysing data on the go in a very visual and intuitive way. So far only CSV and Excel files are supported but my guess is that it will soon be possible to interface to a lot of other applications especially in the Cloud. And - it’s free!

Data Processing and Enrichment in Spark Streaming with Python and Kafka

In my previous blog post I introduced Spark Streaming and how it can be used to process 'unbounded' datasets. The example I did was a very basic one - simple counts of inbound tweets and grouping by user. All very good for understanding the framework and not getting bogged down in detail, but ultimately not so useful.

We're going to stay with Twitter as our data source in this post, but we're going to consider a real-world requirement for processing Twitter data with low-latency. Spark Streaming will again be our processing engine, with future posts looking at other possibilities in this area.

Twitter has come a long way from its early days as a SMS-driven "microblogging" site. Nowadays it's used by millions of people to discuss technology, share food tips, and, of course, track the progress of tea-making. But it's also used for more nefarious purposes, including spam, and sharing of links to pirated material. The requirement we had for this proof of concept was to filter tweets for suspected copyright-infringing links in order that further action could be taken.

The environment I'm using is the same as before - Spark 2.0.2 running on Docker with Jupyter Notebooks to develop the code (and this article!). You can download the full notebook here.

The inbound tweets are coming from an Apache Kafka topic. Any matched tweets will be sent to another Kafka topic. The match criteria are:

  • Not a retweet
  • Contains at least one URL
  • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)
  • The Tweet text must match at least two from a predefined list of artists, albums, and tracks. This is necessary to avoid lots of false positives - think of how many music tracks there are out there, with names that are common in English usage ("yesterday" for example). So we must match at least two ("Yesterday" and "Beatles", or "Yesterday" and "Help!").
    • Match terms will take into account common misspellings (Little Mix -> Litle Mix), hashtags (Little Mix -> #LittleMix), etc

We'll also use a separate Kafka topic for audit/debug purposes to inspect any non-matched tweets.

As well as matching the tweet against the above conditions, we will enrich the tweet message body to store the identified artist/album/track to support subsequent downstream processing.

The final part of the requirement is to keep track of the number of inbound tweets, the number of matched vs unmatched, and for those matched, which artists they were for. These counts need to be per batch and over a window of time too.

Getting Started - Prototyping the Processing Code

Before we get into the meat of the streaming code, let's take a step back and look at what we're wanting the code to achieve. From the previous examples we know we can connect to a Kafka topic, pull in tweets, parse them for given fields, and do windowed counts. So far, so easy (or at least, already figured out!). Let's take a look at nub of the requirement here - the text matching.

If we peruse the BBC Radio 1 Charts we can see the popular albums and artists of the moment (Grant me a little nostalgia here; in my day people 'pirated' music from the Radio 1 chart show onto C90 cassettes, trying to get it without the DJ talking over the start and end. Nowadays it's done on a somewhat more technologically advanced basis). Currently it's "Little Mix" with the album "Glory Days". A quick Wikipedia or Amazon search gives us the track listing too:

  1. Shout Out to My Ex
  2. Touch
  3. F.U.
  4. Oops - Little Mix feat. Charlie Puth
  5. You Gotta Not
  6. Down & Dirty
  7. Power
  8. Your Love
  9. Nobody Like You
  10. No More Sad Songs
  11. Private Show
  12. Nothing Else Matters
  13. Beep Beep
  14. Freak
  15. Touch

A quick twitter search for the first track title gives us this tweet - I have no idea if it's legit or not, but it serves as an example for our matching code requirements:

Using the Twitter developer API I can retrieve the JSON for this tweet directly. I'm using the excellent Paw tool to do this.

From this we can get the text element:

"text": "DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE",

The obvious approach would be to have a list of match terms, something like:

match_text=("Little Mix","Glory Days","Shout Out to My Ex","Touch","F.U.")

But - we need to make sure we've matched two of the three types of metadata (artist/album/track), so we need to know which it is that we've matched in the text. We also need to handle variations in text for a given match (such as misspellings etc).

What I came up with was this:

filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})

def test_matching(test_string):  
    print 'Input: %s' % test_string
    for f in filters:
        for a in f['match']:
            if a.lower() in test_string.lower():
                print '\tTag: %s / Value: %s\n\t\t(Match string %s)' % (f['tag'],f['value'],a)

We could then take the test string from above and test it:

test_matching('DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE
    Tag: artist / Value: Little Mix
        (Match string Little Mix)
    Tag: track / Value: Shout Out To My Ex
        (Match string Shout Out To My Ex)

as well as making sure that variations in naming were also correctly picked up and tagged:

test_matching('DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE
    Tag: album / Value: Glory Days
        (Match string GloryDays)
    Tag: artist / Value: Little Mix
        (Match string Litel Mixx)

Additional Processing

With the text matching figured out, we also needed to address the other requirements:

  • Not a retweet
  • Contains at least one URL
    • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)

Not a Retweet

In the old days retweets were simply reposting the same tweet with a RT prefix; now it's done as part of the Twitter model and Twitter clients display the original tweet with the retweeter shown. In the background though, the JSON is different from an original tweet (i.e. not a retweet).

Original tweet:

{
  "created_at": "Thu Jan 12 00:36:22 +0000 2017",
  "id": 819342218611728384,
  "id_str": "819342218611728384",
  "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

Retweet:

{
  "created_at": "Thu Jan 12 14:40:44 +0000 2017",
  "id": 819554713083461632,
  "id_str": "819554713083461632",
  "text": "RT @GeorgeTakei: Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

  "retweeted_status": {
    "created_at": "Thu Jan 12 00:36:22 +0000 2017",
    "id": 819342218611728384,
    "id_str": "819342218611728384",
    "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",
[...]

So retweets have an additional set of elements in the JSON body, under the retweeted_status element. We can pick this out using the get method as seen in this code snippet, where tweet is a Python object created from a json.loads from the JSON of the tweet:

if tweet.get('retweeted_status'):  
    print 'Tweet is a retweet'
else:  
    print 'Tweet is original'

Contains a URL, and URL is not on Whitelist

Twitter are very good to us in the JSON they supply for each tweet. Every possible attribute of the tweet is encoded as elements in the JSON, meaning that we don't have to do any nasty parsing of the tweet text itself. To find out if there are URLs in the tweet, we just check the entities.urls element, and iterate through the array if present.

if not tweet.get('entities'):  
    print 'no entities element'
else:  
    if not tweet.get('entities').get('urls'):
        print 'no entities.urls element'

The URL itself is again provided to us by Twitter as the expanded_url within the urls array, and using the urlsplit library as I did previously we can extract the domain:

for url in tweet['entities']['urls']:

    expanded_url = url['expanded_url']
    domain = urlsplit(expanded_url).netloc

With the domain extracted, we can then compare it to a predefined whitelist so that we don't pick up tweets that are just linking back to sites such as Spotify, iTunes, etc. Here I'm using the Python set type and issubset method to compare the list of domain(s) that I've extracted from the tweet into the url_info list, against the whitelist:

if set(url_info['domain']).issubset(domain_whitelist):  
    print 'All domains whitelisted'

The Stream Processing Bit

With me so far? We've looked at the requirements for what our stream processing needs to do, and worked out the prototype code that will do this. Now we can jump into the actual streaming code. You can see the actual notebook here if you want to try this yourself.

Job control variables

batchIntervalSec=30  
windowIntervalSec=21600 # Six hours  
app_name = 'spark_twitter_enrich_and_count_rm_01e'  

Make Kafka available to Jupyter

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python'  

Import dependencies

As well as Spark libraries, we're also bringing in the KafkaProducer library which will enable us to send messages to Kafka. This is in the kafka-python package. You can install this standalone on your system, or inline as done below.

# Necessary to make Kafka library available to pyspark
os.system("pip install kafka-python")

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
#    Kafka
from kafka import SimpleProducer, KafkaClient  
from kafka import KafkaProducer

#    json parsing
import json  
#    url deconstruction
from urlparse import urlsplit  
#    regex domain handling
import re  

Define values to match

filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})  

Define whitelisted domains

domain_whitelist=[]  
domain_whitelist.append("itun.es")  
domain_whitelist.append("wikipedia.org")  
domain_whitelist.append("twitter.com")  
domain_whitelist.append("instagram.com")  
domain_whitelist.append("medium.com")  
domain_whitelist.append("spotify.com")  

Function: Unshorten shortened URLs (bit.ly etc)

# Source: http://stackoverflow.com/a/4201180/350613
import httplib  
import urlparse

def unshorten_url(url):  
    parsed = urlparse.urlparse(url)
    h = httplib.HTTPConnection(parsed.netloc)
    h.request('HEAD', parsed.path)
    response = h.getresponse()
    if response.status/100 == 3 and response.getheader('Location'):
        return response.getheader('Location')
    else:
        return url

Function: Send messages to Kafka

To inspect the Kafka topics as messages are sent use:

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 --topic twitter_matched2  

N.B. following the Design Patterns for using foreachRDD guide here.

# http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

def send_to_kafka_matched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_matched2', str(json.dumps(record)))

def send_to_kafka_notmatched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_notmatched2', str(record))

def send_to_kafka_err(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_err2', str(record))

Function: Process each tweet

This is the main processing code. It implements all of the logic described in the requirements above. If a processing condition is not met, the function returns a negative code and description of the condition that was not met. Errors are also caught and returned.

You can see a syntax-highlighted version of the code in the notebook here.

def process_tweet(tweet):  
    # Check that there's a URLs in the tweet before going any further
    if tweet.get('retweeted_status'):
        return (-1,'retweet - ignored',tweet)

    if not tweet.get('entities'):
        return (-2,'no entities element',tweet)

    if not tweet.get('entities').get('urls'):
        return (-3,'no entities.urls element',tweet)

    # Collect all the domains linked to in the tweet
    url_info={}
    url_info['domain']=[]
    url_info['primary_domain']=[]
    url_info['full_url']=[]
    try:
        for url in tweet['entities']['urls']:
            try:
                expanded_url = url['expanded_url']
            except Exception, err:
                return (-104,err,tweet)

            # Try to resolve the URL (assumes it's shortened - bit.ly etc)
            try:
                expanded_url = unshorten_url(expanded_url)
            except Exception, err:
                return (-108,err,tweet)

            # Determine the domain
            try:
                domain = urlsplit(expanded_url).netloc
            except Exception, err:
                return (-107,err,tweet)
            try:
                # Extract the 'primary' domain, e.g. www36.foobar.com -> foobar.com
                #
                # This logic is dodgy for UK domains (foo.co.uk, foo.org.uk, etc) 
                # since it truncates to the last two parts of the domain only (co.uk)
                #
                re_result = re.search('(\w+\.\w+)$',domain)
                if re_result:
                    primary_domain = re_result.group(0)
                else:
                    primary_domain = domain
            except Exception, err:
                return (-105,err,tweet)

            try:
                url_info['domain'].append(domain)
                url_info['primary_domain'].append(primary_domain)
                url_info['full_url'].append(expanded_url)
            except Exception, err:
                return (-106,err,tweet)


        # Check domains against the whitelist
        # If every domain found is in the whitelist, we can ignore them
        try:
            if set(url_info['primary_domain']).issubset(domain_whitelist):
                return (-8,'All domains whitelisted',tweet)
        except Exception, err:
            return (-103,err,tweet)

        # Check domains against the blacklist
        # Unless a domain is found, we ignore it
        #Only use this if you have first defined the blacklist!
        #if not set(domain_blacklist).intersection(url_info['primary_domain']):
        #    return (-9,'No blacklisted domains found',tweet)


    except Exception, err:
        return (-102,err,tweet)

    # Parse the tweet text against list of trigger terms
    # --------------------
    # This is rather messy iterative code that maybe can be optimised
    #
    # Because match terms are not just words, it's not enough to break
    #  up the tweet text into words and match against the filter list.
    #  Instead we have to take each filter term and see if it exists
    #  within the tweet text as a whole
    #
    # Using a set instead of list so that duplicates aren't added
    #
    matched=set()
    try:
        for f in filters:
            for a in f['match']:
                tweet_text = tweet['text']
                match_text = a.decode('utf-8')
                if match_text in tweet_text:
                    matched.add((f['tag'],f['value']))
    except Exception, err:
        return (-101,err,tweet)

    #-----
    # Add the discovered metadata into the tweet object that this function will return
    try:
        tweet['enriched']={}
        tweet['enriched']['media_details']={}
        tweet['enriched']['url_details']=url_info
        tweet['enriched']['match_count']=len(matched)
        for match in matched:
            tweet['enriched']['media_details'][match[0]]=match[1]

    except Exception, err:
        return (-100,err,tweet)

    return (len(matched),tweet)

Function: Streaming context definition

This is the function that defines the streaming context. It needs to be a function because we're using windowing and so the streaming context needs to be configured to checkpoint.

As well as processing inbound tweets, it performs counts of:

  • Inbound
  • Outbound, by type (match/no match/error)
  • For matched tweets, top domains and artists

The code is commented inline to explain how it works. You can see a syntax-highlighted version of the code in the notebook here.

def createContext():  
    sc = SparkContext(appName="spark_twitter_enrich_and_count_01")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, batchIntervalSec)

    # Define Kafka Consumer and Producer
    kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', app_name, {'twitter':1})

    ## Get the JSON tweets payload
    ## >>TODO<<  This is very brittle - if the Kafka message retrieved is not valid JSON the whole thing falls over
    tweets_dstream = kafkaStream.map(lambda v: json.loads(v[1]))

    ## -- Inbound Tweet counts
    inbound_batch_cnt = tweets_dstream.count()
    inbound_window_cnt = tweets_dstream.countByWindow(windowIntervalSec,batchIntervalSec)

    ## -- Process
    ## Match tweet to trigger criteria
    processed_tweets = tweets_dstream.\
        map(lambda tweet:process_tweet(tweet))

    ## Send the matched data to Kafka topic
    ## Only treat it as a match if we hit at least two of the three possible matches (artist/track/album)
    ## 
    ## _The first element of the returned object is a count of the number of matches, or a negative 
    ##  to indicate an error or no URL content in the tweet._
    ## 
    ## _We only want to send the actual JSON as the output, so use a `map` to get just this element_

    matched_tweets = processed_tweets.\
                        filter(lambda processed_tweet:processed_tweet[0]>1).\
                        map(lambda processed_tweet:processed_tweet[1])

    matched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_matched))
    matched_batch_cnt = matched_tweets.count()
    matched_window_cnt = matched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Set up counts for matched metadata
    ##-- Artists
    matched_artists = matched_tweets.map(lambda tweet:(tweet['enriched']['media_details']['artist']))

    matched_artists_batch_cnt = matched_artists.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Artist: %s\tCount: %s" % (x[0],x[1]))

    matched_artists_window_cnt = matched_artists.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Artist: %s\tCount: %s" % (x[0],x[1]))

    ##-- Domains
    ## Since url_details.primary_domain is an array, need to flatMap here
    matched_domains = matched_tweets.flatMap(lambda tweet:(tweet['enriched']['url_details']['primary_domain']))

    matched_domains_batch_cnt = matched_domains.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Domain: %s\tCount: %s" % (x[0],x[1]))
    matched_domains_window_cnt = matched_domains.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Domain: %s\tCount: %s" % (x[0],x[1]))

    ## Display non-matches for inspection
    ## 
    ## Codes less than zero but greater than -100 indicate a non-match (e.g. whitelist hit), but not an error

    nonmatched_tweets = processed_tweets.\
        filter(lambda processed_tweet:(-99<=processed_tweet[0]<=1))

    nonmatched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_notmatched))

    nonmatched_batch_cnt = nonmatched_tweets.count()
    nonmatched_window_cnt = nonmatched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ##  Print any erroring tweets
    ## 
    ## Codes less than -100 indicate an error (try...except caught)

    errored_tweets = processed_tweets.\
                        filter(lambda processed_tweet:(processed_tweet[0]<=-100))
    errored_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_err))

    errored_batch_cnt = errored_tweets.count()
    errored_window_cnt = errored_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Print counts
    inbound_batch_cnt.map(lambda x:('Batch/Inbound: %s' % x))\
        .union(matched_batch_cnt.map(lambda x:('Batch/Matched: %s' % x))\
            .union(nonmatched_batch_cnt.map(lambda x:('Batch/Non-Matched: %s' % x))\
                .union(errored_batch_cnt.map(lambda x:('Batch/Errored: %s' % x)))))\
        .pprint()

    inbound_window_cnt.map(lambda x:('Window/Inbound: %s' % x))\
        .union(matched_window_cnt.map(lambda x:('Window/Matched: %s' % x))\
            .union(nonmatched_window_cnt.map(lambda x:('Window/Non-Matched: %s' % x))\
                .union(errored_window_cnt.map(lambda x:('Window/Errored: %s' % x)))))\
        .pprint()

    matched_artists_batch_cnt.pprint()
    matched_artists_window_cnt.pprint()
    matched_domains_batch_cnt.pprint()
    matched_domains_window_cnt.pprint()

    return ssc

Start the streaming context

ssc = StreamingContext.getOrCreate('/tmp/%s' % app_name,lambda: createContext())  
ssc.start()  
ssc.awaitTermination()  

Stream Output

Counters

From the stdout of the job we can see the simple counts of inbound and output splits, both per batch and accumulating window:

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Batch/Inbound: 9
Batch/Matched: 0
Batch/Non-Matched: 9
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Window/Inbound: 9
Window/Non-Matched: 9

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Batch/Inbound: 21
Batch/Matched: 0
Batch/Non-Matched: 21
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Window/Inbound: 30
Window/Non-Matched: 30

The details of identified artists within tweets is also tracked, per batch and accumulated over the window period (6 hours, in this example)

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Batch/Artist: Major Lazer       Count: 4
Batch/Artist: David Bowie       Count: 1

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Window/Artist: Major Lazer      Count: 1320
Window/Artist: Drake    Count: 379
Window/Artist: Taylor Swift     Count: 160
Window/Artist: Metallica        Count: 94
Window/Artist: David Bowie      Count: 84
Window/Artist: Lady Gaga        Count: 37
Window/Artist: Pink Floyd       Count: 11
Window/Artist: Kate Bush        Count: 10
Window/Artist: Justice  Count: 9
Window/Artist: The Weeknd       Count: 8

Kafka Output

Matched

The matched Kafka topic holds a stream of tweets in JSON format, with the discovered metadata (artist/album/track) added. I'm using the Kafka console consumer to view the contents, parsed through jq to show just the tweet text and metadata that has been added.

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 \  
--topic twitter_matched1 \
--from-beginning|jq -r "[.text,.enriched.url_details.primary_domain[0],.enriched.media_details.artist,.enriched.media_details.album,.enriched.media_details.track,.enriched.match_count] "
[
  "Million Reasons by Lady Gaga - this is horrendous sorry @ladygaga https://t.co/rEtePIy3OT",
  "youtube.com",
  "https://www.youtube.com/watch?v=NvMoctjjdhA&feature=youtu.be",
  "Lady Gaga",
  null,
  "Million Reasons",
  2
]

Non-Matched

On our Kafka topics outbound we can see the non-matched messages. Probably you'd disable this stream once the processing logic was finalised, but it's useful to be able to audit and validate the reasons for non-matches. Here a retweet is ignored, and we can see it's a retweet from the RT prefix of the text field. The -1 is the return code from the process_tweets function denoting a non-match:

(-1, 'retweet - ignored', {u'contributors': None, u'truncated': False, u'text': u'RT @ChartLittleMix: Little Mix adicionou datas para a Summer Shout Out 2017 no Reino Unido https://t.co/G4H6hPwkFm', u'@timestamp': u'2017-01-13T11:45:41.000Z', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 819873048132288513, u'favorite_count': 0, u'source':

Summary

In this article we've built on the foundations of the initial exploration of Spark Streaming on Python, expanding it out to address a real-world processing requirement. Processing unbounded streams of data this way is not as complex as you may think, particularly for the benefits that it can yield in reducing the latencies between an event occuring and taking action from it.

We've not touched on some of the more complex areas, such as scaling this up to multiple Spark nodes, partitioned Kafka topics, and so on - that's another [kafka] topic (sorry...) for another day. The code itself is also rudimentary - before moving it into Production there'd be some serious refactoring and optimisation review to be performed on it.

You can find the notebook for this article here, and the previous article's here.

If you'd like more information on how Rittman Mead can help your business get the most of out its data, please do get in touch!