Category Archives: Rittman Mead

OBIEE 11g and Essbase – Faking Federation Using the GoURL

This blog is going to address what happens when we can’t take advantage of the Admin tool’s powerful vertical federation capabilities when integrating relational stars and Essbase cubes. In the Admin tool, synonymously referred to as the RPD, vertical federation is the process of integrating an aggregate data source, in this case Essbase, with a detail level source from a data mart. This technique not only has the ability to increase query efficiency and decrease query time, it also has the added benefit of bringing together two powerful and dynamic reporting tools. But like most things, there is a pretty big caveat to this approach. But, before I jump into what that is, some housework. To start, let’s make sure things don’t get lost in translation when going back and forth between Essbase and OBIEE jargon. In Essbase speak, dimensions can be thought of as tables in a relational structure, whereas Essbase generations can be thought of as columns in each table, and members are the values in each column. Housework done, now the caveat. Often, dimensions in Essbase cubes are built in such a way as to not neatly support federation; that is, they are arranged so as to have an uneven number of generations relative to their corresponding relational dimension. It should be noted at this point that while federation is possible with a ragged hierarchical structure, it can get kind of messy, essentially ending up in a final product that doesn’t really look like something an Essbase-centric user community would readily and eagerly adopt. So what then, can we do when federation is out of the question? Let’s frame the solution in the form of a not-atypical client scenario. Say we’ve got a requirement per a large finance institution of a client to bring together their Essbase cubes they’ve used thus far for their standardized reporting, i.e. balance sheets, income statements and the like, with their relational source in order to drill to account detail information behind the numbers they’re seeing on said reports. They’ve got a pretty large user base that’s fairly entrenched and happy with their Smart View and Excel in getting what they want from their cubes. And why shouldn’t they be? OBIEE simply can’t support this level of functionality when reporting on an Essbase source, in most cases. And, in addition to these pretty big user adoption barriers to an OBIEE solution, now we’ve got technology limitations to contend with. So what are our options then when faced with this dilemma? How can we wow these skeptical users with near seamless functionality between sources? The secret lies with URL Action Links! And while this solution is great to go from summary level data in Essbase to its relational counterpart, it is also a great way to simply pass values from one subject area to another. There are definitely some tricks to set this up, but more on those later. Read on.

The Scenario

In order to best demonstrate this solution, let’s set up a dashboard with two pages, one for each report, and a corresponding dashboard prompt. The primary, source report, out of Essbase, will be something that could easily resemble a typical financial report, if not at least in structure. From this high-level chart, or similar summary level analysis, we’ll be able to drill to a detail report, out of a relational source, to identify the drivers behind any figures present on the analysis. In this example, we’re going to be using the 11.1.1.9 Sample App, Sample Essbase subject area to go to the equivalent relational area, Sample Sales. Yes, you could federate these two, as they’ve done in Sample App, however they’ll serve well to demonstrate how the following concept could work for financial reporting against ragged or parent-child structures. Values for Product Type, in the following instance, could just as well be the descendants or children of a specific account, as an example. As well, there is no equivalent relational subject area to use for the sake of the SampleApp Essbase GL subject area. In the example below, we have a summary, month level pivot table giving us a monthly sales trend. The user, in the following example, can prompt on the Year and Customer segment through a dashboard prompt, but as you’ll see, this could easily be any number of prompts for your given scenario.

Monthly Trend Summary:

Solution 1:

In the sales trend example above, we are going to enable our user to click on a value for a revenue figure and then navigate to a detail report that shows products sold for the month by date. Again, this all must be done while passing any chosen parameters from both the dashboard prompt and analysis along to the detail analysis.

Proof of Concept

First, let’s start with the guts of the report example above. As you can see, there is quite a bit more under the hood than meets the eye. Let’s go over the approach piece by piece to help build a more thorough understanding of the method.

Step 1: Include the Columns!

So the idea here is that we want to pass any and all dimensional information associated with the revenue figure that we pick to a detail level report that will be filtered on the set of parameters at the chosen intersection. We can hide these columns later, so your report won’t be a mess. I’ll add here that you might want to set any promoted values to be equal to the presentation variable on its respective dashboard prompt with a default value set, as seen below. This will help to make the report digestible on the compound layout. The following picture shows the prompted values to drive our summary report on Year and Customer Segment. You can do this in the filters pane on the criteria tab with the following syntax:

 

                            All column values we want to pass need to be represented on the report:

 

                           Values that will be passed to detail report (in this case, BizTech, Communication, Active Singles, 2012, and 2012 / 11):

Step 2: More Columns!

In addition to the columns that comprise the report, we need to add an additional iteration of every column for all of those added to the report in the first place. In the pictures above, you can see that these are the columns titled with the ‘URL’ prefix. In the column editor, concatenate quotes to the column values by attaching the following string (this is a single quote followed by a double quote and another single quote w/ NO spaces between them):

‘ ” ‘ || “Table”.”Column Name” || ‘ ” ‘

While this step may seem extemporaneous, you’ll see a bit later that this step is all too necessary to successfully pass our column values through our URL Action Links. After you’ve created the custom columns, just group them along with their counterpart in the report, as in the pics above.

Step 3: An Approach to Handling Hierarchies

In the previous pictures, you can see the products hierarchy that comprises the rows to the report. In order to pass any value from the hierarchy as well as its members we are going to have to include its respective generations in the rows as well. For our example, we’re going to use Brand, LOB, and Product Type. In this way, a user can select any sales value and have all three of these values passed as filter parameters to the detail analysis through a URL. You’ll notice that we haven’t given these columns a counterpart wrapped in quotes as you were told to do previously. This is quite on purpose, as we’ll see later. These columns will provide for another example on how to pass values without having to implement a second column for the purpose of wrapping the value in quotes.

 

When first placing the hierarchy on your analysis and expanding it to where you’d like it for the sake of the report, you can simply select all the column values, right click and then select ‘Keep Only’. This will establish a selection step under the Products Hierarchy to ensure that the report always opens to the specified structure from now on. So, that’s good for now, let’s get to the magic of this approach.

 

Step 4. Set up the Action Link

In this case, we’re going to ‘drill’ off of the Sales column in our table, but we could really ‘drill’ off of anything, as you’ll see. So, pop open the Interaction tab for the column and select Action Links as our primary interaction. Edit that guy as follows (see URL procedure below). It used to be that we could do this via the ‘P’ parameters, however this method seems to be mostly deprecated in favor of the col/val method, as we shall utilize below.

URL Procedure

http://sampleapp.rittmanmead.com:7780/analytics/saw.dll? – Server URL*
Portal&Path=@{1} – path to dashboard
&Page=@{2} – dashboard page
&Action=@{3} – action to perform, in this case navigate (there are others)
&col1=@{4} – column from target analysis we wish to manipulate (our sales detail analysis)
&val1=@{5} – column from source analysis with which we are going to pass a filter parameter to target
&col2=@{6}
&val2=@{7}
&col3=@{8}
&val3=@{9}
&col4=@{10}
&val4=“@{11}” – will discuss these quoted parameters later on
&col5=@{12}
&val5=”@{13}”

*Note that this value can be made into a variable in order to be moved to different environments (DEV/TEST, etc…) while maintaining link integrity

The picture above details how to set up the URL link as described above. The col1 value is the column from the target analysis we want to filter using the value (val1) from our source. Be sure to qualify this column from the subject area from which it originates, in this case “A – Sample Sales”.

Ex: “A – Sample Sales”.”Time”.”T05 Per Name Year”

Val1, as these parameters exist in ‘sets’, is the column from our source analysis we want to use to filter the target analysis. This is where our custom, quoted columns come into play. Instead of using the original column from our analysis, we’re going to use its quoted counterpart. This will ensure that any values passed through the URL will be enclosed in quotes, as is required buy the URL. Note that we’re not using a value parameter in this case, but a column instead (the dropdown to the left of the text box).

Ex: ‘ ” ‘ || “Time”.”T05 Per Name Year” || ‘ ” ‘

You can proceed this way to pass as many values as you’d like to your detail analysis, with this coln, valn method. Again, just be sure that your columns are included in the source analysis or the values won’t get ported over. Once you’ve got all your columns and values set up, go ahead and enter them into the URL field in the Edit Action dialogue box, as above. Make sure you reference your variables using the proper syntax (similar to a presentation variable w/ an @ sign):

Ex: col1=@{4} – ‘4’ being the variable name (note that these can be named most anything)

Quoting Parameters

As an alternative to including an extra iteration of each column for the sake of passing quoted column values, we can instead, put quotes around the parameter in our URL, as in the example above. The limitation to this method, however, is that you can only pass a singular value, as in Year, for example. In later posts, we’ll address how to handle passing multiple values, as you might through a dashboard prompt.

Step 5. Set Up the Detail Analysis

For our detail analysis we’re going to set it up in much the same way as our summary. That is, we need to include the columns we want to filter on in the target report as. Unfortunately, our target report won’t simply pick them up as filters as you might put on your filters pane, without including them on the actual analysis. Again, any columns we don’t want visible to a user can be hidden. Below, we simply want to see the Calendar Date, Product, and Revenue, but filtered by all of our source analysis columns.

In the criteria view for our target, detail analysis, we need to make sure that we’re also setting any filtered columns to ‘is prompted’. This will ensure that our target analysis listens to any filter parameters passed through the URL from our source, summary analysis. As a last step, we must again fully qualify our filters, as in the picture below.

This picture shows our Year ‘is prompted’ filter on our target, detail analysis. Note that this column is also a column, albeit hidden, on this report as well. This will act as a filter on the analysis. It is being ‘prompted’ not by a dashboard prompt, in this instance, but by our source, summary analysis.

Step 6. Testing it All Out

Now that we’ve got all the pieces of the puzzle together, let’s see if it works! To QA this thing, let’s put a filter object on the target, detail analysis to make sure that the report is picking up on any values passed. So if we click on a sales value, we should be taken to the target analysis and see that all the parameters we set up were passed. The picture below confirms this!

Conclusion

Hopefully this can be one more trick to keep in the tool belt when faced with a similar scenario. If you have any hiccups in your implementation of this solution or other questions, please feel free to respond to this post. Stay tuned for additional articles related to this topic that go much more in depth. How do you handle passing multiple column values? How do I keep my report query time low with all those extra columns? How do I pass values using the presentation variable syntax? Can I use the Evaluate function to extract the descendants of a filtered column?

 

 

The post OBIEE 11g and Essbase – Faking Federation Using the GoURL appeared first on Rittman Mead Consulting.

Rittman Mead and Oracle Big Data Webcast Series – November 2015

We’re running a set of three webcasts together with Oracle on three popular use-cases for big data within an Oracle context – with the first one running tomorrow, November 3rd 2015 15:00 – 16:00 GMT / 16:00 – 17:00 CET on extending the data warehouse using Hadoop and NoSQL technologies.

The sessions are running over three weeks this month and look at ways we’re seeing Rittman Mead use big data technologies to extend the and capabilities of their data warehouse, create analysis sandpits for analysing customer behaviour, and taking data discovery into the Hadoop era using Oracle Big Data Discovery. All events are free to attend, we’re timing them to suit the UK,Europe and the US, with details of each webcast are as follows:

NewImage

Extending and Enhancing Your Data Warehouse to Address Big Data

Organizations with data warehouses are increasingly looking at big data technologies to extend the capacity of their platform, offload simple ETL and data processing tasks and add new capabilities to store and process unstructured data along with their existing relational datasets. In this presentation we’ll look at what’s involved in adding Hadoop and other big data technologies to your data warehouse platform, see how tools such as Oracle Data Integrator and Oracle Business Intelligence can be used to process and analyze new “big data” data sources, and look at what’s involved in creating a single query and metadata layer over both sources of data.

Audience: DBAs, DW managers, architects Tuesday 3rd November, 15:00 – 16:00 GMT / 16:00 – 17:00 CET – Click here to register

Audience : DBAs, DW managers, architects

What is Big Data Discovery and how does it complement traditional Business Analytics?

Data Discovery is an analysis technique that complements traditional business analytics, and enables users to combine, explore and analyse disparate datasets to spot opportunities and patterns that lie hidden within your data. Oracle Big Data discovery takes this idea and applies it to your unstructured and big data datasets, giving users a way to catalogue, join and then analyse all types of data across your organization. At the same time Oracle Big Data Discovery reduces the dependency on expensive and often difficult to find Data Scientists, opening up many Big Data tasks to “Citizen” Data Scientists. In this session we’ll look at Oracle Big Data Discovery and how it provides a “visual face” to your big data initiatives, and how it complements and extends the work that you currently do using business analytics tools.

Audience : Data analysts, market analysts, & Big Data project team members Tuesday 10th November, 15:00 – 16:00 GMT / 16:00 – 17:00 CET – Click here to register

Adding Big Data to your Organization to create true 360-Degree Customer Insight

Organisations are increasingly looking to “big data” to create a true, 360-degree view of their customer and market activity. Big data technologies such as Hadoop, NoSQL databases and predictive modelling make it possible now to bring highly granular data from all customer touch-points into a single repository and use that information to make better offers, create more relevant products and predict customer behaviour more accurately. In this session we’ll look at what’s involved in creating a customer 360-degree view using big data technologies on the Oracle platform, see how unstructured and social media sources can be added to more traditional transactional and customer attribute data, and how machine learning and predictive modelling techniques can then be used to classify, cluster and predict customer behaviour.

Audience : MI Managers, CX Managers, CIOs, BI / Analytics Managers Tuesday 24th November, 15:00 – 16:00 GMT / 16:00 – 17:00 CET – Click here to register

Oracle OpenWorld 2015 Roundup Part 2 : Data Integration, and Big Data (in the Cloud…)

In yesterdays part one of our three-part Oracle Openworld 2015 round-up, we looked at the launch of OBIEE12c just before Openworld itself, and the new Data Visualisation Cloud Service that Thomas Kurian demo’d in his mid-week keynote. In part two we’ll look at what happened around data integration both on-premise and in the cloud, along with big data – and as you’ll see they’re too topics that are very much linked this year.

First off, data integration – and like OBIEE12c, ODI 12.2.1 got released a day or so before Openworld as part of the wider Oracle Fusion Middleware 12c Release 2 platform rollout. Some of what was coming in ODI12.2.1 got back-ported to ODI 12.1 earlier in the year in the form of the ODI Enterprise Edition Big Data Options, and we covered the new capabilities it gave ODI in terms of generating Pig and Spark mappings in a series of posts earlier in the year – adding Pig as an execution language gives ODI an ability to create dataflow-style mappings to go with Hive’s set-based transformations, whilst also opening-up access to the wide range of Pig-specific UDF libraries such as DataFu for log analysis. Spark, in the meantime, can be useful for smaller in-memory data transformation jobs and as we’ll see in a moment, lays the foundation for streaming and real-time ingestion capabilities.

NewImage

The other key feature that ODI12.2.1 provides though is better integration with external source control systems. ODI already has some element of version control built in, but as it’s based around ODI’s own repository database tables it’s hard to integrate with more commonly-used enterprise source control tools such as Subversion or Git, and there’s no standard way to handle development concepts like branching, merging and so on. ODI 12.2.1 adds these concepts into core ODI and initially focuses on SVN as the external source control tool, with Git support planned in the near future.

NewImage

Updates to GoldenGate, Enterprise Data Quality and Enterprise Metadata Management were also announced, whilst Oracle Big Data Preparation Cloud Service got its first proper outing since release earlier in the year. Big Data Preparation Cloud Service (BDP for short) to my mind suffers a bit from confusion over what it does and what market it serves – at some point it’s been positioned as a tool for the “citizen data scientist” as it enables data domain experts to wrangle and prepare data for loading into Hadoop, whilst at other times it’s labelled a tool for production data transformation jobs under the control of IT. What is misleading is the “big data” label – it runs on Hadoop and Spark but it’s not limited to big data use-cases, and as the slides below show it’s a great option for loading data into BI Cloud Service as an alternative to more IT-centric tools such as ODI.

NewImage

It was another announcement though at Openworld that made Big Data Prep Service suddenly make a lot more sense – the announcement of a new initiative called Dataflow ML, something Oracle describe as “ETL 2.0” with an entirely cloud-based architecture and heavy use of machine learning (the “ML” in “Dataflow ML”) to automate much of the profiling and discovery process – the key innovation on Big Data Prep Service.

NewImage

It’s early days for Dataflow ML but clearly this is the direction Oracle will want to take as applications and platforms move to the cloud – I called-out ODI’s unsuitability for running in the cloud a couple of years ago and contrasted its architecture with that of cloud-native tools such as Snaplogic, and Dataflow ML is obviously Oracle’s bid to move data integration into the cloud – coupling that with innovations around Spark as the data processing platform and machine-learming to automate routine tasks and it sounds like it could be a winner – watch this space as they say.

So the other area I wanted to cover in this second of three update pieces was on big data. All of the key big data announcements from Oracle came in last year’s Openworld – Big Data Discovery, Big Data SQL, Big Data Prep Service (or Oracle Data Enrichment Cloud Service as it was called back then) and this year saw updates to Big Data SQL (Storage Indexes), Big Data Discovery (general fit-and-finish enhancements) announced at this event. What is probably more significant though is the imminent availability of all this – plus Oracle Big Data Appliance – in Oracle’s Public Cloud.

NewImage

Most big data PoCs I see outside of Oracle start on Amazon AWS and build-out from there – starting at very low-cost and moving from Amazon Elastic MapReduce to Cloudera CDH (via Cloudera Director), for example, or going from cloud to on-premise as the project moves into production. Oracle’s Big Data Cloud Service takes a different approach – instead of using a shared cloud infrastructure and potentially missing the point of Hadoop (single user access to lots of machines, vs. cloud’s timeshared access to slices of machines) Oracle instead effectively lease you a Big Data Appliance along with a bundle of software; the benefits being around performance but with quite a high startup cost vs. starting small with AWS.

The market will tell which approach over time gets most traction, but where Big Data Cloud Service does help tools like Big Data Discovery is that theres much more opportunities for integration and customers will be much more open to an Oracle tool solution compared to those building on commodity hardware and community Hadoop distributions – to my mind every Big Data Cloud Service customer ought to buy BDD and most probably Big Data Prep Service, so as customers adopt cloud as a platform option for big data projects I’d expect an uptick in sales of Oracle’s big data tools.

On a related topic and looping back to Oracle Data Integration, the other announcement in this area that was interesting was around Spark Streaming support in Oracle Data Integrator 12c.

NewImage

ODI12c has got some great batch-style capabilities around Hadoop but as I talked about earlier in the year in an article on Flume, Morphines and Cloudera Search the market is all about real-time data ingestion now, batch is more for one-off historical data loads. Again like Dataflow ML this feature is in beta and probably won’t be out for many months, but when it comes out it’ll complete ODI’s capabilities around big data ingestion – we’re hoping to take part in the beta so keep an eye on the blog for news as it comes out.

So that’s it for part 2 of our Oracle Openworld 2015 update – we’ll complete the series tomorrow with a look at Oracle BI Applications, Oracle Database 12cR2 “sharding” and something very interesting planned for a future Oracle 12c database release – “Analytic Views”.

Oracle OpenWorld 2015 Roundup Part 1 : OBIEE12c and Data Visualisation Cloud Service

Last week saw Oracle Openworld 2015 running in San Francisco, USA, with Rittman Mead delivering a number of sessions around BI, data integration, Big Data and cloud. Several of us took part in Partner Advisory Councils on the Friday before Openworld itself, and along with the ACE Director briefings earlier that week we went into Openworld with a pretty good idea already on what was being announced – but as ever there were a few surprises and some sessions hidden away that were actually very significant in terms of where Oracle might be going – so let’s go through what we thought were the key announcements first, then we’ll get onto the more interesting stuff at the end.

And of course the key announcement for us and our customers was the general availability of OBIEE12c 12.2.1, which we described in a blog post at the time as being focused primarily on business agility and self-service – the primary drivers of BI license spend today. OBIEE12c came out the Friday before Openworld with availability across all supported Unix platforms as well as Linux and Windows, with this initial release not seeming massively different to 11g for developers and end-users at least at first glance – RPD development through the BI Administration tool is largely the same as 11g, at least for now; Answers and Dashboards has had a face-lift and uses a new flatter UI style called “Oracle Alta” but otherwise is recognisably similar to 11g, and the installer lays down Essbase and BI Publisher alongside OBIEE.

NewImage

Under the covers though there are some key differences and improvements that will only become apparent after a while, or are really a foundation for much wider changes and improvements coming later in the 12c product timeline. The way you upload RPDs gives some hint of what’s to come – with 11g we used Enterprise Manager to upload new RPDs to the BI Server which then had to be restarted to pick-up the new repository, whereas 12c has a separate utility for uploading RPDs and they’re not stored in quite the same way as before (more on this to come…). In addition there’s no longer any need to restart the BI Server (or cluster of BI Servers) to use the new repository, and the back-end has been simplified in lots of different ways all designed to enable cloning, provisioning and portability between on-premise and cloud based around two new concepts of “service instances” and “BI Modules” – expect to hear more about these over the next few years, and with the diagram below outlining 12c’s product architecture at a high-level.

NewImage

Of course there are two very obvious new front-end features in OBIEE12c, Visual Analyzer and data-mashups, but they require an extra net-new license on-top of BI Foundation Suite to use in production. Visual Analyzer is Oracle’s answer to Tableau and adds data analysis, managed data discovery and data visualisation to OBIEE’s existing capabilities, but crucially uses OBIEE’s RPD as the primary data source for users’ analysis – in other words providing Tableau-like functionality but  with a trusted, managed single source of data managed and curated centrally. Visual Analyzer is all about self-service and exploring datasets, and it’s here that the new data-mashup feature is really aimed at – users can upload spreadsheets of additional measures and attributes to the core dataset used in their Visual Analyzer project, and blend or “mash-up” their data to create their own unique visualizations, as shown in the screenshot below:

NewImage

Data Mashups are also available for the core Answers product as well but they’re primarily aimed at VA, and for more casual users where data visualisation is all they want and cloud is their ideal delivery platform, Oracle also released Data Visualisation Cloud Service (DVCS)– aka Visual-Analyzer-in-the-cloud.

NewImage

To see DVCS in action, the Youtube video below shows just the business analytics part of Thomas Kurian’s session where DVCS links to Oracle’s Social Network Cloud Service to provide instant data visualisation and mashup capabilities all from the browser – pretty compelling if you ignore the Oracle Social Network part (is that ever used outside of Oracle?)

Think of DVCS as BICS with Answers, Dashboards and the RPD Model Builder stripped-out, all data instead uploaded from spreadsheets, half the price of BICS and first-in-line for new VA features as they become available. This “cloud first” strategy goes across the board for Oracle now – partly incentive to move to the cloud, mostly a reflection of how much easier it is to ship new features out when Oracle controls the installation, DVCS and BICS will see updates on a more or less monthly cycle now (see this MOS document that details new features added to BICS since initial availability, and this blog post from ourselves announcing VA and data mashups on BICS well before they became available on on-premise. In fact we’re almost at the point now where it’s conceivable that whole on-premise OBIEE systems can be moved into Oracle Cloud now, with my main Openworld session on just this topic – the primary end-user benefit being first to access the usability, self-service and data viz capabilities Oracle are now adding to their BI platform.

NewImage

Moreover, DVCS is probably just the start of a number of standalone, on-premise and cloud VA derivates trying to capture the Tableau / Excel / PowerBI market – pricing is more competitive than with BICS but as Oracle move more downmarket with VA it’ll end-up competing more head-to-head with Tableau on features, and PowerBI is just a tenth of the cost of DVCS – I see it more as a “land-and-expand” play with the aim being to trade the customer up to full BICS, or at least capture the segment of the market who’d otherwise go to Excel or Tableau desktop – it’ll be interesting to see how this one plays out.

So that’s it for Part 1 of our Oracle Openworld 2015 roundup – tomorrow we’ll look at data integration and big data.

Forays into Kafka 02 – Enabling Flexible Data Pipelines

One of the defining features of “Big Data” from a technologist’s point of view is the sheer number of tools and permutations at one’s disposal. Do you go Flume or Logstash? Avro or Thrift? Pig or Spark? Foo or Bar? (I made that last one up). This wealth of choice is wonderful because it means we can choose the right tool for the right job each time.

Of course, we need to establish that have indeed chosen the right tool for the right job. But here’s the paradox. How do we easily work out if a tool is going to do what we want of it and is going to be a good fit, without disturbing what we already have in place? Particularly if it’s something that’s going to be part of an existing Productionised data pipeline, inserting a new tool partway through what’s there already is going to risk disrupting that. We potentially end up with a series of cloned environments, all diverging from each other, and not necessarily comparable (not to mention the overhead of the resource to host it all).

The same issue arises when we want to change the code or configuration of an existing pipeline. Bugs creep in, ideas to enhance the processing that you’ve currently got present themselves. Wouldn’t it be great if we could test these changes reliably and with no risk to the existing system?

This is where Kafka comes in. Kafka is very useful for two reasons:

  1. You can use it as a buffer for data that can be consumed and re-consumed on demand
  2. Multiple consumers can all pull the data, independently and at their own rate.

So you take your existing pipeline, plumb in Kafka, and then as and when you want to try out additional tools (or configurations of existing ones) you simply take another ‘tap’ off the existing store. This is an idea that Gwen Shapira put forward in May 2015 and really resonated with me.

I see Kafka sitting right on that Execution/Innovation demarcation line of the Information Management and Big Data Reference Architecture that Oracle and Rittman Mead produced last year:

Kafka enables us to build a pipeline for our analytics that breaks down into two phases:

  1. Data ingest from source into Kafka, simple and reliable. Fewest moving parts as possible.
  2. Post-processing. Batch or realtime. Uses Kafka as source. Re-runnable. Multiple parallel consumers: –
    • Productionised processing into Event Engine, Data Reservoir and beyond
    • Adhoc/loosely controlled Data Discovery processing and re-processing

These two steps align with the idea of “Obtain” and “Scrub” that Rittman Mead’s Jordan Meyer talked about in his BI Forum 2015 Masterclass about the Data Discovery:

So that’s the theory – let’s now look at an example of how Kafka can enable us to build a more flexible and productive data pipeline and environment.

Flume or Logstash? HDFS or Elasticsearch? … All of them!

Mark Rittman wrote back in April 2014 about using Apache Flume to stream logs from the Rittman Mead web server over to HDFS, from where they could be analysed in Hive and Impala. The basic setup looked like this:

Another route for analysing data is through the ELK stack. It does a similar thing – streams logs (with Logstash) in to a data store (Elasticsearch) from where they can be analysed, just with a different set of tools with a different emphasis on purpose. The input is the same – the web server log files. Let’s say I want to evaluate which is the better mechanism for analysing my log files, and compare the two side-by-side. Ultimately I might only want to go forward with one, but for now, I want to try both.

I could run them literally in parallel:

The disadvantage with this is that I have twice the ‘footprint’ on my data source, a Production server. A principle throughout all of this is that we want to remain light-touch on the sources of data. Whether a Production web server, a Production database, or whatever – upsetting the system owners of the data we want is never going to win friends.

An alternative to running in parallel would be to use one of the streaming tools to load data in place of the other, i.e.

or

The issue with this is I want to validate the end-to-end pipeline. Using a single source is better in terms of load/risk to the source system, but less so for validating my design. If I’m going to go with Elasticsearch as my target, Logstash would be the better fit source. Ditto HDFS/Flume. Both support connectors to the other, but using native capabilities always feels to me a safer option (particularly in the open-source world). And what if the particular modification I’m testing doesn’t support this kind of connectivity pattern?

Can you see where this is going? How about this:

The key points here are:

  1. One hit on the source system. In this case it’s flume, but it could be logstash, or another tool. This streams each line of the log file into Kafka in the exact order that it’s read.
  2. Kafka holds a copy of the log data, for a configurable time period. This could be days, or months – up to you and depending on purpose (and disk space!)
  3. Kafka is designed to be distributed and fault-tolerant. As with most of the boxes on this logical diagram it would be physically spread over multiple machines for capacity, performance, and resilience.
  4. The eventual targets, HDFS and Elasticsearch, are loaded by their respective tools pulling the web server entries exactly as they were on disk. In terms of validating end-to-end design we’re still doing that – we’re just pulling from a different source.

Another massively important benefit of Kafka is this:

Sooner or later (and if you’re new to the tool and code/configuration required, probably sooner) you’re going to get errors in your data pipeline. These may be fatal and cause it to fall in a heap, or they may be more subtle and you only realise after analysis that some of your data’s missing or not fully enriched. What to do? Obviously you need to re-run your ingest process. But how easy is that? Where is the source data? Maybe you’ll have a folder full of “.processed” source log files, or an HDFS folder of raw source data that you can reprocess. The issue here is the re-processing – you need to point your code at the alternative source, and work out the range of data to reprocess.

This is all eminently do-able of course – but wouldn’t it be easier just to rerun your existing ingest pipeline and just rewind the point at which it’s going to pull data from? Minimising the amount of ‘replumbing’ and reconfiguration to run a re-process job vs. new ingest makes it faster to do, and more reliable. Each additional configuration change is an opportunity to mis-configure. Each ‘shadow’ script clone for re-running vs normal processing is increasing the risk of code diverging and stale copies being run.

The final pipeline in this simple example looks like this:

  • The source server logs are streamed into Kafka, with a permanent copy up onto Amazon’s S3 for those real “uh oh” moments. Kafka, in a sandbox environment with a ham-fisted sysadmin, won’t be bullet-proof. Better to recover a copy from S3 than have to bother the Production server again. This is something I’ve put in for this specific use case, and wouldn’t be applicable in others.
  • From Kafka the web server logs are available to stream, as if natively from the web server disk itself, through Flume and Logstash.

There’s a variation on a theme of this, that looks like this:

Instead of Flume -> Kafka, and then a second Flume -> HDFS, we shortcut this and have the same Flume agent as is pulling from source writing to HDFS. Why have I not put this as the final pipeline? Because of this:

Let’s say that I want to do some kind of light-touch enrichment on the files, such as extracting the log timestamp in order to partition my web server logs in HDFS by the date of the log entry (not the time of processing, because I’m working with historical files too). I’m using a regex_extractor interceptor in Flume to determine the timestamp from the event data (log entry) being processed. That’s great, and it works well – when it works. If I get my regex wrong, or the log file changes date format, the house of cards comes tumbling down. Now I have a mess, because my nice clean ingest pipeline from the source system now needs fixing and re-running. As before, of course it is possible to write this cleanly so that it doesn’t break, etc etc, but from the point of view of decoupling operations for manageability and flexibility it makes sense to keep them separate (remember the Obtain vs Scrub point above?).

The final note on this is to point out that technically we can implement the pipeline using a Kafka Flume channel, which is a slightly neater way of doing things. The data still ends up in the S3 sink, and available in Kafka for streaming to all the consumers.

Kafka in Action

Let’s take a look at the configuration to put the above theory into practice. I’m running all of this on Oracle’s BigDataLite 4.2.1 VM which includes, amongst many other goodies, CDH 5.4.0. Alongside this I’ve installed into /opt :

  • apache-flume-1.6.0
  • elasticsearch-1.7.3
  • kafka_2.10-0.8.2.1
  • kibana-4.1.2-linux-x64
  • logstash-1.5.4

The Starting Point – Flume -> HDFS

First, we’ve got the initial Logs -> Flume -> HDFS configuration, similar to what Mark wrote about originally:

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources = apache_server  
source_agent.sources.apache_server.type = exec  
source_agent.sources.apache_server.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_server.batchSize = 1  
source_agent.sources.apache_server.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
source_agent.channels = memoryChannel  
source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
source_agent.sinks = hdfs_sink  
source_agent.sinks.hdfs_sink.type = hdfs  
source_agent.sinks.hdfs_sink.channel = memoryChannel  
source_agent.sinks.hdfs_sink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
source_agent.sinks.hdfs_sink.hdfs.fileType = DataStream  
source_agent.sinks.hdfs_sink.hdfs.writeFormat = Text  
source_agent.sinks.hdfs_sink.hdfs.rollSize = 0  
source_agent.sinks.hdfs_sink.hdfs.rollCount = 10000  
source_agent.sinks.hdfs_sink.hdfs.rollInterval = 600

After running this

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent 
--conf-file flume_website_logs_02_tail_source_hdfs_sink.conf

we get the logs appearing in HDFS and can see them easily in Hue:

Adding Kafka to the Pipeline

Let’s now add Kafka to the mix. I’ve already set up and started Kafka (see here for how), and Zookeeper’s already running as part of the default BigDataLite build.

First we need to define a Kafka topic that is going to hold the log files. In this case it’s called apache_logs:

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper bigdatalite:2181 
--create --topic apache_logs  --replication-factor 1 --partitions 1

Just to prove it’s there and we can send/receive message on it I’m going to use the Kafka console producer/consumer to test it. Run these in two separate windows:

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh 
--broker-list bigdatalite:9092 --topic apache_logs

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh 
--zookeeper bigdatalite:2181 --topic apache_logs

With the Consumer running enter some text, any text, in the Producer session and you should see it appear almost immediately in the Consumer window.

Now that we’ve validated the Kafka topic, let’s plumb it in. We’ll switch the existing Flume config to use a Kafka sink, and then add a second Flume agent to do the Kafka -> HDFS bit, giving us this:

The original flume agent configuration now looks like this:

source_agent.sources = apache_log_tail  
source_agent.channels = memoryChannel  
source_agent.sinks = kafka_sink

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to Kafka  
source_agent.sinks.kafka_sink.channel = memoryChannel  
source_agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink  
source_agent.sinks.kafka_sink.batchSize = 5  
source_agent.sinks.kafka_sink.brokerList = bigdatalite:9092  
source_agent.sinks.kafka_sink.topic = apache_logs

Restart the kafka-console-consumer.sh from above so that you can see what’s going into Kafka, and then run the Flume agent. You should see the log entries appearing soon after. Remember that kafka-console-consumer.sh is just one consumer of the logs – when we plug in the Flume consumer to write the logs to HDFS we can opt to pick up all of the entries in Kafka, completely independently of what we have or haven’t consumed in kafka-console-consumer.sh.

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent  
--conf-file flume_website_logs_03_tail_source_kafka_sink.conf

[oracle@bigdatalite ~]$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh 
--zookeeper bigdatalite:2181 --topic apache_logs  

37.252.227.70 - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)"  
174.121.162.130 - - [06/Sep/2015:08:08:35 +0000] "HEAD /blog HTTP/1.1" 301 - "http://oraerp.com/blog" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"  
177.71.183.71 - - [06/Sep/2015:08:08:35 +0000] "GET /blog/ HTTP/1.0" 200 145999 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service; http://www.monitis.com)"  
174.121.162.130 - - [06/Sep/2015:08:08:36 +0000] "HEAD /blog/ HTTP/1.1" 200 - "http://oraerp.com/blog" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"  
173.192.34.91 - - [06/Sep/2015:08:08:44 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)"  
217.146.9.53 - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service; http://www.monitis.com)"  
82.47.31.235 - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.1" 200 36946 "-" "Echoping/6.0.2"

Set up the second Flume agent to use Kafka as a source, and HDFS as the target just as it was before we added Kafka into the pipeline:

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink 

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink  
target_agent.sinks.hdfsSink.type = hdfs  
target_agent.sinks.hdfsSink.channel = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 10000  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

Fire up the agent:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent 
-f flume_website_logs_04_kafka_source_hdfs_sink.conf

and as the website log data streams in to Kafka (from the first Flume agent) you should see the second Flume agent sending it to HDFS and evidence of this in the console output from Flume:

15/10/27 13:53:53 INFO hdfs.BucketWriter: Creating /user/oracle/incoming/rm_logs/apache_log/FlumeData.1445954032932.tmp

and in HDFS itself:

Play it again, Sam?

All we’ve done to this point is add Kafka into the pipeline, ready for subsequent use. We’ve not changed the nett output of the data pipeline. But, we can now benefit from having Kafka there, by re-running some of our HDFS load without having to go back to the source files. Let’s say we want to partition the logs as we store them. But, we don’t want to disrupt the existing processing. How? Easy! Just create another Flume agent with the additional configuration in to do the partitioning.

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel  
target_agent.sources.kafkaSource.groupId = new  
target_agent.sources.kafkaSource.kafka.auto.offset.reset = smallest  
target_agent.sources.kafkaSource.interceptors = i1

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 1000

# Regex Interceptor to set timestamp so that HDFS can be written to partitioned  
target_agent.sources.kafkaSource.interceptors.i1.type = regex_extractor  
target_agent.sources.kafkaSource.interceptors.i1.serializers = s1  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = timestamp  
#
# Match this format logfile to get timestamp from it:  
# 76.164.194.74 - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)"  
target_agent.sources.kafkaSource.interceptors.i1.regex = (\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s\+\d{4})  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z  
#

## Write to HDFS  
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink  
target_agent.sinks.hdfsSink.type = hdfs  
target_agent.sinks.hdfsSink.channel = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache/%Y/%m/%d/access_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 0  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

The important lines of note here (as highlighted above) are:

  • the regex_extractor interceptor which determines the timestamp of the log event, then used in the hdfs.path partitioning structure
  • the groupId and kafka.auto.offset.reset configuration items for the kafkaSource.
    • The groupId ensures that this flume agent’s offset in the consumption of the data in the Kafka topic is maintained separately from that of the original agent that we had. By default it is flume, and here I’m overriding it to new. It’s a good idea to specify this explicitly in all Kafka flume consumer configurations to avoid complications.
    • kafka.auto.offset.reset tells the consumer that if no existing offset is found (which is won’t be, if the groupId is new one) to start from the beginning of the data rather than the end (which is what it will do by default).
    • Thus if you want to get Flume to replay the contents of a Kafka topic, just set the groupId to an unused one (eg ‘foo01’, ‘foo02’, etc) and make sure the kafka.auto.offset.reset is smallest

Now run it (concurrently with the existing flume agents if you want):

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent 
-f flume_website_logs_07_kafka_source_partitioned_hdfs_sink.conf

You should see a flurry of activity (or not, depending on how much data you’ve already got in Kafka), and some nicely partitioned apache logs in HDFS:

Crucially, the existing flume agent and non-partitioned HDFS pipeline stays in place and functioning exactly as it was – we’ve not had to touch it. We could then run two two side-by-side until we’re happy the partitioning is working correctly and then decommission the first. Even at this point we have the benefit of Kafka, because we just turn off the original HDFS-writing agent – the new “live” one continues to run, it doesn’t need reconfiguring. We’ve validated the actual configuration we’re going to use for real, we’ve not had to simulate it up with mock data sources that then need re-plumbing prior to real use.

Clouds and Channels

We’re going to evolve the pipeline a bit now. We’ll go back to a single Flume agent writing to HDFS, but add in Amazon’s S3 as the target for the unprocessed log files. The point here is not so much that S3 is the best place to store log files (although it is a good option), but as a way to demonstrate a secondary method of keeping your raw data available without impacting the source system. It also fits nicely with using the Kafka flume channel to tighten the pipeline up a tad:

Amazon’s S3 service is built on HDFS itself, and Flume can use the S3N protocol to write directly to it. You need to have already set up your S3 ‘bucket’, and have the appropriate AWS Access Key ID and Secret Key. To get this to work I added these credentials to /etc/hadoop/conf.bigdatalite/core-site.xml (I tried specifying them inline with the flume configuration but with no success):

<property>  
    <name>fs.s3n.awsAccessKeyId</name>  
    <value>XXXXXXXXXXXXX</value>  
</property>  
<property>  
    <name>fs.s3n.awsSecretAccessKey</name>  
    <value>YYYYYYYYYYYYYYYYYYYY</value>  
</property>

Once you’ve set up the bucket and credentials, the original flume agent (the one pulling the actual web server logs) can be amended:

source_agent.sources = apache_log_tail  
source_agent.channels = kafkaChannel  
source_agent.sinks = s3Sink

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = kafkaChannel


## Write to Kafka Channel  
source_agent.channels.kafkaChannel.channel = kafkaChannel  
source_agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel  
source_agent.channels.kafkaChannel.topic = apache_logs  
source_agent.channels.kafkaChannel.brokerList = bigdatalite:9092  
source_agent.channels.kafkaChannel.zookeeperConnect = bigdatalite:2181

## Write to S3  
source_agent.sinks.s3Sink.channel = kafkaChannel  
source_agent.sinks.s3Sink.type = hdfs  
source_agent.sinks.s3Sink.hdfs.path = s3n://rmoff-test/apache  
source_agent.sinks.s3Sink.hdfs.fileType = DataStream  
source_agent.sinks.s3Sink.hdfs.filePrefix = access_log  
source_agent.sinks.s3Sink.hdfs.writeFormat = Text  
source_agent.sinks.s3Sink.hdfs.rollCount = 10000  
source_agent.sinks.s3Sink.hdfs.rollSize = 0  
source_agent.sinks.s3Sink.hdfs.batchSize = 10000  
source_agent.sinks.s3Sink.hdfs.rollInterval = 600

Here the source is the same as before (server logs), but the channel is now Kafka itself, and the sink S3. Using Kafka as the channel has the nice benefit that the data is now already in Kafka, we don’t need that as an explicit target in its own right.

Restart the source agent using this new configuration:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent 
--conf-file flume_website_logs_09_tail_source_kafka_channel_s3_sink.conf

and you should get the data appearing on both HDFS as before, and now also in the S3 bucket:

Didn’t Someone Say Logstash?

The premise at the beginning of this exercise was that I could extend an existing data pipeline to pull data into a new set of tools, as if from the original source, but without touching that source or the existing configuration in place. So far we’ve got a pipeline that is pretty much as we started with, just with Kafka in there now and an additional feed to S3:

Now we’re going to extend (or maybe “broaden” is a better term) the data pipeline to add Elasticsearch into it:

Whilst Flume can write to Elasticsearch given the appropriate extender, I’d rather use a tool much closer to Elasticsearch in origin and direction – Logstash. Logstash supports Kafka as an input (and an output, if you want), making the configuration ridiculously simple. To smoke-test the configuration just run Logstash with this configuration:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                }
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'

        }  
}

output {  
        stdout {codec => rubydebug }  
        }

A few of things to point out in the input configuration:

  • You need to specify plain codec (assuming your input from Kafka is). The default codec for the Kafka plugin is json, and Logstash does NOT like trying to parse plain text and json as I found out:

    37.252.227.70 - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)" {:exception=>#<NoMethodError: undefined method `[]' for 37.252:Float>, :backtrace=>["/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/event.rb:73:in `initialize'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-codec-json-1.0.1/lib/logstash/codecs/json.rb:46:in `decode'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}

  • As well as specifying the codec, I needed to specify the charset. Without this I got \u0000\xBA\u0001 at the beginning of each message that Logstash pulled from Kafka

  • Specifying reset_beginning and auto_offset_reset tell Logstash to pull everything in from Kafka, rather than starting at the latest offset.

When you run the configuration file above you should see a stream of messages to your console of everything that is in the Kafka topic:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_10_kafka_source_console_output.conf

The output will look like this – note that Logstash has added its own special @version and @timestamp fields:

{  
       "message" => "203.199.118.224 - - [09/Oct/2015:04:13:23 +0000] "GET /wp-content/uploads/2014/10/JFB-View-Selector-LowRes-300x218.png HTTP/1.1" 200 53295 "http://www.rittmanmead.com/2014/10/obiee-how-to-a-view-selector-for-your-dashboard/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"",  
      "@version" => "1",  
    "@timestamp" => "2015-10-27T17:29:06.596Z"  
}

Having proven the Kafka-Logstash integration, let’s do something useful – get all those lovely log entries streaming from source, through Kafka, enriched in Logstash with things like geoip, and finally stored in Elasticsearch:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                }
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'  
        }
}


filter {  
        # Parse the message using the pre-defined "COMBINEDAPACHELOG" grok pattern  
        grok { match => ["message","%{COMBINEDAPACHELOG}"] }

        # Ignore anything that's not a blog post hit, characterised by /yyyy/mm/post-slug form  
        if [request] !~ /^/[0-9]{4}/[0-9]{2}/.*$/ { drop{} }

        # From the blog post URL, strip out the year/month and slug  
        #  http://www.rittmanmead.com/2015/02/obiee-monitoring-and-diagnostics-with-influxdb-and-grafana/  
        #     year  => 2015  
        #     month =>   02  
        #     slug  => obiee-monitoring-and-diagnostics-with-influxdb-and-grafana  
        grok { match => [ "request","/%{NUMBER:post-year}/%{NUMBER:post-month}/(%{NUMBER:post-day}/)?%{DATA:post-slug}(/.*)?$"] }

        # Combine year and month into one field  
        mutate { replace => [ "post-year-month" , "%{post-year}-%{post-month}" ] }

        # Use GeoIP lookup to locate the visitor's town/country  
        geoip { source => "clientip" }

        # Store the date of the log entry (rather than now) as the event's timestamp  
        date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]}  
}

output {  
        elasticsearch { host => "bigdatalite"  index => "blog-apache-%{+YYYY.MM.dd}"}  
        }

Make sure that Elasticsearch is running and then kick off Logstash:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_01_kafka_source_parsed_to_es.conf

Nothing will appear to happen on the console:

log4j, [2015-10-27T17:36:53.228]  WARN: org.elasticsearch.bootstrap: JNA not found. native methods will be disabled.  
Logstash startup completed

But in the background Elasticsearch will be filling up with lots of enriched log data. You can confirm this through the useful kopf plugin to see that the Elasticsearch indices are being created:

and directly through Elasticsearch’s RESTful API too:

$ curl -s -XGET http://bigdatalite:9200/_cat/indices?v|sort  
health status index                  pri rep docs.count docs.deleted store.size pri.store.size  
yellow open   blog-apache-2015.09.30   5   1      11872            0       11mb           11mb  
yellow open   blog-apache-2015.10.01   5   1      13679            0     12.8mb         12.8mb  
yellow open   blog-apache-2015.10.02   5   1      10042            0      9.6mb          9.6mb  
yellow open   blog-apache-2015.10.03   5   1       8722            0      7.3mb          7.3mb

And of course, the whole point of streaming the data into Elasticsearch in the first place – easy analytics through Kibana:

Conclusion

Kafka is awesome :-D

We’ve seen in this article how Kafka enables the implementation of flexible data pipelines that can evolve organically without requiring system rebuilds to implement or test new methods. It allows the data discovery function to tap in to the same source of data as the more standard analytical reporting one, without risking impacting the source system at all.